zscore => 1,
zincrby => 1,
append => 1,
+ publish => 1,
};
# we don't want DESTROY to fallback into AUTOLOAD
$command =~ s/.*://;
warn "## $command ",Dumper(@_) if $self->{debug};
-
+
+ my ($callback, $after) = (sub {}, sub {});
+
+ if ( $command eq 'subscribe' ) {
+ ($callback, $after) = @_ == 3 ? splice @_, 1, 2
+ : @_ == 2 ? (pop, sub {})
+ : die 'Invalid arguments for consume command';
+ }
+
my $send;
if ( defined $bulk_command->{$command} ) {
. length( $value )
. "\r\n$value\r\n"
;
- } else {
+ } elsif ( $command ne 'consume' ) {
$send
= uc($command)
. ' '
. "\r\n"
;
}
-
- warn ">> $send" if $self->{debug};
- print $sock $send;
+
+ if (defined $send) {
+ warn ">> $send" if $self->{debug};
+ print $sock $send;
+ }
if ( $command eq 'quit' ) {
close( $sock ) || die "can't close socket: $!";
return 1;
}
+
+ if ($command eq 'subscribe') {
+ $self->{subscribe} = 1;
+ }
+ elsif ($command eq 'unsubscribe') {
+ $self->{subscribe} = 0;
+ }
+
+ if ( $command eq 'subscribe' ) {
+ $after->( $self, $self->__read_result($command) );
+
+ my $messages = 0;
+
+ while ($self->{subscribe}) {
+ $callback->( $self, $self->__read_result($command) );
+ $messages++;
+ }
+
+ return $messages;
+ }
+ else {
+ return $self->__read_result($command);
+ }
+}
+sub __read_result {
+ my ($self, $command) = @_;
+
+ use bytes;
+
+ my $sock = $self->{sock} || die "no server connected";
+
my $result = <$sock> || die "can't read socket: $!";
Encode::_utf8_on($result);
warn "<< $result" if $self->{debug};
my @list = ( 0 .. $size );
foreach ( 0 .. $size ) {
- $list[ $_ ] = $self->__read_bulk( substr(<$sock>,1,-2) );
+ my $result = <$sock>;
+ $list[ $_ ] = substr($result, 0, 1) eq ':' ? substr($result, 1, -2) : $self->__read_bulk( substr($result,1,-2) );
}
warn "## list = ", Dumper( @list ) if $self->{debug};
--- /dev/null
+#!/usr/bin/perl
+
+use warnings;
+use strict;
+
+use Test::More tests => 7;
+use lib 'lib';
+
+BEGIN {
+ use_ok( 'Redis' );
+}
+
+diag "Publish / subscribe commands";
+
+ok( my $p = Redis->new(), 'publisher' );
+ok( my $s = Redis->new(), 'subscriber' );
+
+my $callback = sub {
+ my ($self, @result) = @_;
+
+ is_deeply( [ @result ], ['message', 'channel-1', 'test message'] );
+ is_deeply( [ $self->unsubscribe( 'channel-1' ) ], [ 'unsubscribe', 'channel-1', 0 ] );
+};
+
+my $after = sub {
+ cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 client received published message' );
+};
+
+ok $s->subscribe( 'channel-1', $callback, $after );
+