Add basic publish / subscribe functionality and tests.
authorJames Barton <jbarton@jbarton-laptop.ideaworks.com>
Wed, 2 Jun 2010 17:39:52 +0000 (12:39 -0500)
committerJames Barton <jbarton@jbarton-laptop.ideaworks.com>
Wed, 2 Jun 2010 17:39:52 +0000 (12:39 -0500)
lib/Redis.pm
t/30-Redis-PubSub.t [new file with mode: 0644]

index 5f476cf..8c4aad6 100644 (file)
@@ -69,6 +69,7 @@ my $bulk_command = {
        zscore => 1,
        zincrby => 1,
        append => 1,
+       publish => 1,
 };
 
 # we don't want DESTROY to fallback into AUTOLOAD
@@ -86,7 +87,15 @@ sub AUTOLOAD {
        $command =~ s/.*://;
 
        warn "## $command ",Dumper(@_) if $self->{debug};
-
+       
+       my ($callback, $after) = (sub {}, sub {});
+       
+    if ( $command eq 'subscribe' ) {
+        ($callback, $after) = @_ == 3 ? splice @_, 1, 2
+                            : @_ == 2 ? (pop, sub {})
+                            :           die 'Invalid arguments for consume command';
+    }
+    
        my $send;
 
        if ( defined $bulk_command->{$command} ) {
@@ -100,7 +109,7 @@ sub AUTOLOAD {
                        . length( $value )
                        . "\r\n$value\r\n"
                        ;
-       } else {
+       } elsif ( $command ne 'consume' ) {
                $send
                        = uc($command)
                        . ' '
@@ -108,15 +117,48 @@ sub AUTOLOAD {
                        . "\r\n"
                        ;
        }
-
-       warn ">> $send" if $self->{debug};
-       print $sock $send;
+       
+    if (defined $send) {
+           warn ">> $send" if $self->{debug};
+           print $sock $send;
+    }
 
        if ( $command eq 'quit' ) {
                close( $sock ) || die "can't close socket: $!";
                return 1;
        }
+       
+       if ($command eq 'subscribe') {
+           $self->{subscribe} = 1;
+       }
+       elsif ($command eq 'unsubscribe') {
+           $self->{subscribe} = 0;
+       }
+    
+    if ( $command eq 'subscribe' ) {
+        $after->( $self, $self->__read_result($command) );
+        
+        my $messages = 0;
+        
+        while ($self->{subscribe}) {
+            $callback->( $self, $self->__read_result($command) );
+            $messages++;
+        }
+        
+        return $messages;
+    }
+    else {
+        return $self->__read_result($command);
+    }
+}
 
+sub __read_result {
+    my ($self, $command) = @_;
+    
+       use bytes;
+
+       my $sock = $self->{sock} || die "no server connected";
+    
        my $result = <$sock> || die "can't read socket: $!";
        Encode::_utf8_on($result);
        warn "<< $result" if $self->{debug};
@@ -171,7 +213,8 @@ sub __read_multi_bulk {
 
        my @list = ( 0 .. $size );
        foreach ( 0 .. $size ) {
-               $list[ $_ ] = $self->__read_bulk( substr(<$sock>,1,-2) );
+           my $result = <$sock>;
+               $list[ $_ ] = substr($result, 0, 1) eq ':' ? substr($result, 1, -2) : $self->__read_bulk( substr($result,1,-2) );
        }
 
        warn "## list = ", Dumper( @list ) if $self->{debug};
diff --git a/t/30-Redis-PubSub.t b/t/30-Redis-PubSub.t
new file mode 100644 (file)
index 0000000..5d36856
--- /dev/null
@@ -0,0 +1,30 @@
+#!/usr/bin/perl
+
+use warnings;
+use strict;
+
+use Test::More tests => 7;
+use lib 'lib';
+
+BEGIN {
+       use_ok( 'Redis' );
+}
+
+diag "Publish / subscribe commands";
+
+ok( my $p = Redis->new(), 'publisher' );
+ok( my $s = Redis->new(), 'subscriber' );
+
+my $callback = sub {
+    my ($self, @result) = @_;
+
+    is_deeply( [ @result ], ['message', 'channel-1', 'test message'] );
+    is_deeply( [ $self->unsubscribe( 'channel-1' ) ], [ 'unsubscribe', 'channel-1', 0 ] );
+};
+
+my $after = sub {
+    cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 client received published message' );
+};
+
+ok $s->subscribe( 'channel-1', $callback, $after );
+