Add support for psubscribe/punsubscribe
authorJames Barton <james@neodon.com>
Tue, 8 Jun 2010 11:35:04 +0000 (06:35 -0500)
committerJames Barton <james@neodon.com>
Tue, 8 Jun 2010 11:35:04 +0000 (06:35 -0500)
lib/Redis.pm
t/30-Redis-PubSub.t

index c61ef2c..4e87dbe 100644 (file)
@@ -36,10 +36,12 @@ sub _sock {
 sub _new_sock {
     my ($self) = @_;
     
-    return IO::Socket::INET->new(
+    my $sock = IO::Socket::INET->new(
            PeerAddr => $self->{server} || $ENV{REDIS_SERVER} || '127.0.0.1:6379',
            Proto    => 'tcp',
     ) or die $!;
+    
+    return $sock;
 }
 
 sub _send {
@@ -84,7 +86,7 @@ sub AUTOLOAD {
                return 1;
         }
        
-           if ($command eq 'unsubscribe') {
+           if ($command eq 'unsubscribe' || $command eq 'punsubscribe') {
                my @results             = $self->_read_results();
                my $channels_subscribed = $results[-1];
                
@@ -161,12 +163,22 @@ sub _read_multi_bulk {
 
 sub subscribe {
     my ($self, %args) = @_;
+    return $self->_subscribe('subscribe', %args);
+}
+
+sub psubscribe {
+    my ($self, %args) = @_;
+    return $self->_subscribe('psubscribe', %args);
+}
+
+sub _subscribe {
+    my ($self, $command, %args) = @_;
     
     $args{consume}     ||= sub {};
     $args{subscribe}   ||= sub {};
     $args{unsubscribe} ||= sub {};
 
-       $self->_send_command('subscribe', @{ $args{channels} });
+       $self->_send_command($command, @{ $args{channels} });
 
     $self->{subscribed} = 1;
     
index 32eaf42..5929615 100644 (file)
@@ -3,7 +3,7 @@
 use warnings;
 use strict;
 
-use Test::More tests => 7;
+use Test::More tests => 20;
 use lib 'lib';
 
 BEGIN {
@@ -15,20 +15,65 @@ diag "Publish / subscribe commands";
 ok( my $p = Redis->new(), 'publisher' );
 ok( my $s = Redis->new(), 'subscriber' );
 
-my $consume = sub {
-    my ($self, @result) = @_;
+cmp_ok $s->subscribe(
+    channels  => ['nonexistent'],
+    subscribe => sub {
+        cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 0, '0 clients received published message' );
+        is_deeply( [ $s->unsubscribe( 'nonexistent' ) ], [ 'unsubscribe', 'nonexistent', 0 ] );
+    },
+    consume     => sub { die "Shouldn't receive messages on nonexistent channel" },
+    unsubscribe => sub { 123 },
+), '==', 123;
 
-    is_deeply( [ @result ], ['message', 'channel-1', 'test message'] );
-    is_deeply( [ $self->unsubscribe( 'channel-1' ) ], [ 'unsubscribe', 'channel-1', 0 ] );
-};
+cmp_ok $s->subscribe(
+    channels  => ['channel-1'],
+    subscribe => sub {
+        cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 client received published message' );
+    },
+    consume => sub {
+        my ($self, @result) = @_;
 
-my $subscribe = sub {
-    cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 client received published message' );
-};
+        is_deeply( [ @result ], ['message', 'channel-1', 'test message'] );
+        is_deeply( [ $s->unsubscribe( 'channel-1' ) ], [ 'unsubscribe', 'channel-1', 0 ] );
+    },
+    unsubscribe => sub { 234 },
+), '==', 234;
 
-my $unsubscribe = sub {
-    return 234;
-};
+my @consumers = (
+    sub {
+        my ($self, @result) = @_;
+        is_deeply( [ @result ], ['pmessage', 'channel-*', 'channel-1', 'test message'] )
+    },
+    sub {
+        my ($self, @result) = @_;
+        is_deeply( [ @result ], ['pmessage', 'channel-*', 'channel-2', 'test message'] );
+        is_deeply( [ $s->punsubscribe( 'channel-*' ) ], [ 'punsubscribe', 'channel-*', 0 ] );
+    }
+);
+
+cmp_ok $s->psubscribe(
+    channels  => ['channel-*'],
+    subscribe => sub {
+        cmp_ok( $p->publish( 'channel-1',   'test message' ), '==', 1, '1 clients received published message' );
+        cmp_ok( $p->publish( 'channel-2',   'test message' ), '==', 1, '1 clients received published message' );
+        cmp_ok( $p->publish( 'nonexistent', 'test message' ), '==', 0, '0 clients received published message' );
+    },
+    consume => sub {
+        my ($self, @result) = @_;
+        my $consumer = shift @consumers;
+        $consumer->($self, @result);
+    },
+    unsubscribe => sub { 345 },
+), '==', 345;
+
+cmp_ok $s->psubscribe(
+    channels    => ['nomatch.*'],
+    subscribe => sub {
+        cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 0, '0 clients received published message' );
+        is_deeply( [ $s->punsubscribe( 'nomatch.*' ) ], [ 'punsubscribe', 'nomatch.*', 0 ] );
+    },
+    consume     => sub { die "Shouldn't receive messages on nonexistent channel" },
+    unsubscribe => sub { 456 },
+), '==', 456;
 
-cmp_ok $s->subscribe( channels => ['channel-1'], subscribe => $subscribe, consume => $consume, unsubscribe => $unsubscribe ), '==', 234;