Added support for Redis PubSub:
authorPedro Melo <melo@simplicidade.org>
Sat, 7 Aug 2010 12:07:26 +0000 (13:07 +0100)
committerPedro Melo <melo@simplicidade.org>
Sat, 7 Aug 2010 12:26:48 +0000 (13:26 +0100)
 * support commands SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, and PUNSUBSCRIBE:
   * PUBLISH is a regular Redis command, already supported.
 * last argument of subscription changes commands must be a callback, a
   coderef:
   * for each message, the callback will be called with three parameters:
     * the messages data;
     * the topic to whom the message was sent;
     * the subscription that catched this message.
 * you can subscribe to multiple topics or topic matches with the same
   callback: messages to any of those topics will invoke the callback;
 * you can call p?subscribe() multiple times with the same topics or
   topic matching rules, and with different callbacks:
   * we will manage a single Redis subscription, and call all the
     callbacks per message received.
 * p?unsubscribe() calls require the original callback, to remove the
   correct subscription.

Signed-off-by: Pedro Melo <melo@simplicidade.org>
Makefile.PL
lib/Redis.pm
t/03-pubsub.t [new file with mode: 0644]

index 94fb37f..360f476 100644 (file)
@@ -11,6 +11,7 @@ WriteMakefile(
     PREREQ_PM => {
         'Test::More' => 0.92,
         'Test::Exception' => 0,
+        'Test::Deep' => 0,
                'IO::Socket::INET' => 0,
                'Data::Dumper' => 0,
                'Carp' => 0,
index 11ba2cb..3829007 100644 (file)
@@ -4,6 +4,7 @@ use warnings;
 use strict;
 
 use IO::Socket::INET;
+use IO::Select;
 use Fcntl qw( O_NONBLOCK F_SETFL );
 use Data::Dumper;
 use Carp qw/confess/;
@@ -59,10 +60,13 @@ sub new {
   $self->{rbuf}      = '';
 
   $self->{is_subscriber} = 0;
+  $self->{subscribers}   = {};
 
   return bless($self, $class);
 }
 
+sub is_subscriber { $_[0]{is_subscriber} }
+
 
 ### we don't want DESTROY to fallback into AUTOLOAD
 sub DESTROY { }
@@ -81,8 +85,26 @@ sub AUTOLOAD {
   $command =~ s/.*://;
   $self->__is_valid_command($command);
 
-  $self->__send_command($command, @_);
+  ## PubSub commands use a different answer handling
+  if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
+    $pr = '' unless $pr;
+
+    my $cb = pop;
+    confess("Missing required callback in call to $command(), ")
+      unless ref($cb) eq 'CODE';
+
+    my @subs = @_;
+    @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
+      if $unsub;
+    return unless @subs;
+
+    $self->__send_command($command, @subs);
 
+    my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
+    return $self->__process_subscription_changes($command, \%cbs);
+  }
+
+  $self->__send_command($command, @_);
   return $self->__read_response($command);
 }
 
@@ -126,6 +148,86 @@ sub keys {
 }
 
 
+### PubSub
+sub wait_for_messages {
+  my ($self, $timeout) = @_;
+
+  my $s = IO::Select->new;
+  $s->add($self->{sock});
+
+  my $count = 0;
+  while ($s->can_read($timeout)) {
+    while ($self->__can_read_sock) {
+      my @m = $self->__read_response('WAIT_FOR_MESSAGES');
+      $self->__process_pubsub_msg(\@m);
+      $count++;
+    }
+  }
+
+  return $count;
+}
+
+sub __process_unsubscribe_requests {
+  my ($self, $cb, $pr, @unsubs) = @_;
+  my $subs = $self->{subscribers};
+
+  my @subs_to_unsubscribe;
+  for my $sub (@unsubs) {
+    my $key = "${pr}message:$sub";
+    my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}];
+    next if @$cbs;
+
+    delete $subs->{$key};
+    push @subs_to_unsubscribe, $sub;
+  }
+
+  return @subs_to_unsubscribe;
+}
+
+sub __process_subscription_changes {
+  my ($self, $cmd, $expected) = @_;
+  my $subs = $self->{subscribers};
+
+  while (%$expected) {
+    my @m = $self->__read_response($cmd);
+
+    ## Deal with pending PUBLISH'ed messages
+    if ($m[0] =~ /^p?message$/) {
+      $self->__process_pubsub_msg(\@m);
+      next;
+    }
+
+    my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
+    $key .= "message:$m[1]";
+    my $cb = delete $expected->{$key};
+
+    push @{$subs->{$key}}, $cb unless $unsub;
+
+    $self->{is_subscriber} = $m[2];
+  }
+}
+
+sub __process_pubsub_msg {
+  my ($self, $m) = @_;
+  my $subs = $self->{subscribers};
+
+  my $sub   = $m->[1];
+  my $cbid  = "$m->[0]:$sub";
+  my $data  = pop @$m;
+  my $topic = $m->[2] || $sub;
+
+  if (!exists $subs->{$cbid}) {
+    warn "Message for topic '$topic' ($cbid) without expected callback, ";
+    return;
+  }
+
+  $_->($data, $topic, $sub) for @{$subs->{$cbid}};
+
+  return 1;
+
+}
+
+
 ### Mode validation
 sub __is_valid_command {
   my ($self, $cmd) = @_;
diff --git a/t/03-pubsub.t b/t/03-pubsub.t
new file mode 100644 (file)
index 0000000..a0b63b9
--- /dev/null
@@ -0,0 +1,118 @@
+#!perl
+
+use strict;
+use warnings;
+use Test::More;
+use Test::Deep;
+use Test::Exception;
+use Redis;
+
+my %got;
+my $pub = Redis->new();
+my $sub = Redis->new();
+
+is($pub->publish('aa', 'v1'), 0, "No subscribers to 'aa' topic");
+
+## Basic pubsub
+my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
+$sub->subscribe('aa', 'bb', $sub_cb);
+is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'");
+
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(\%got, {'aa' => 'v1:aa'}, "... for the expected topic, 'aa'");
+
+my $sub_cb2 = sub { my ($v, $t, $s) = @_; $got{"2$s"} = uc("$v:$t") };
+$sub->subscribe('aa', $sub_cb2);
+is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'");
+
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(
+  \%got,
+  {'aa' => 'v1:aa', '2aa' => 'V1:AA'},
+  "... for the expected topic, 'aa', with two handlers"
+);
+
+
+## Trick subscribe with other messages
+my $psub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
+%got = ();
+is($pub->publish('aa', 'v2'), 1, "Delivered to 1 subscriber of topic 'aa'");
+$sub->psubscribe('a*', 'c*', $psub_cb);
+cmp_deeply(
+  \%got,
+  {'aa' => 'v2:aa', '2aa' => 'V2:AA'},
+  '... received message while processing psubscribe(), two handlers'
+);
+
+is($pub->publish('aa', 'v3'), 2, "Delivered to 2 subscriber of topic 'aa'");
+is($sub->wait_for_messages(1), 2, '... yep, got the expected 2 messages');
+cmp_deeply(
+  \%got,
+  {'aa' => 'v3:aa', 'a*' => 'v3:aa', '2aa' => 'V3:AA'},
+  "... for the expected subs, 'aa' and 'a*', three handlers total"
+);
+
+## Test subscribe/psubscribe diffs
+%got = ();
+is($pub->publish('aaa', 'v4'), 1, "Delivered to 1 subscriber of topic 'aaa'");
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(\%got, {'a*' => 'v4:aaa'}, "... for the expected sub, 'a*'");
+
+
+## Subscriber mode status
+is($sub->is_subscriber, 4, 'Current subscriber has 4 subscriptions active');
+is($pub->is_subscriber, 0, '... the publisher has none');
+
+
+## Unsubscribe
+$sub->unsubscribe('xx', sub { });
+is($sub->is_subscriber, 4,
+  "No match to our subscriptions, unsubscribe doesn't change active count");
+
+$sub->unsubscribe('aa', $sub_cb);
+is($sub->is_subscriber, 4,
+  "unsubscribe ok, active count is still 4, another handler is alive");
+
+$sub->unsubscribe('aa', $sub_cb2);
+is($sub->is_subscriber, 3,
+  "unsubscribe done, active count is now 3, both handlers are done");
+
+$pub->publish('aa', 'v5');
+%got = ();
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(\%got, {'a*', 'v5:aa'}, "... for the expected key, 'a*'");
+
+$sub->unsubscribe('a*', $psub_cb);
+is($sub->is_subscriber, 3,
+  "unsubscribe with topic wildcard failed, active count is now 3");
+
+$pub->publish('aa', 'v6');
+%got = ();
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(\%got, {'a*', 'v6:aa'}, "... for the expected key, 'a*'");
+
+$sub->unsubscribe('bb', $sub_cb);
+is($sub->is_subscriber, 2, "unsubscribe with 'bb' ok, active count is now 2");
+
+$sub->punsubscribe('a*', $psub_cb);
+is($sub->is_subscriber, 1,
+  "punsubscribe with 'a*' ok, active count is now 1");
+
+is($pub->publish('aa', 'v6'), 0, "Publish to 'aa' now gives 0 deliveries");
+%got = ();
+is($sub->wait_for_messages(1), 0, '... yep, no messages delivered');
+cmp_deeply(\%got, {}, '... and an empty messages recorded set');
+
+is($sub->is_subscriber, 1, 'Still some pending subcriptions active');
+throws_ok sub { $sub->info },
+  qr/Cannot use command 'INFO' while in SUBSCRIBE mode/,
+  '... still an error to try commands in subscribe mode';
+$sub->punsubscribe('c*', $psub_cb);
+is($sub->is_subscriber, 0, '... but none anymore');
+
+lives_ok sub { $sub->info },
+  'Other commands ok after we leave subscriber_mode';
+
+
+## And we are done
+done_testing();