From 56197d6df0962e44fdc7a2d5e4018c7900083087 Mon Sep 17 00:00:00 2001 From: Pedro Melo Date: Sat, 7 Aug 2010 13:07:26 +0100 Subject: [PATCH] Added support for Redis PubSub: * support commands SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, and PUNSUBSCRIBE: * PUBLISH is a regular Redis command, already supported. * last argument of subscription changes commands must be a callback, a coderef: * for each message, the callback will be called with three parameters: * the messages data; * the topic to whom the message was sent; * the subscription that catched this message. * you can subscribe to multiple topics or topic matches with the same callback: messages to any of those topics will invoke the callback; * you can call p?subscribe() multiple times with the same topics or topic matching rules, and with different callbacks: * we will manage a single Redis subscription, and call all the callbacks per message received. * p?unsubscribe() calls require the original callback, to remove the correct subscription. Signed-off-by: Pedro Melo --- Makefile.PL | 1 + lib/Redis.pm | 104 +++++++++++++++++++++++++++++++++++++++++++- t/03-pubsub.t | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 222 insertions(+), 1 deletion(-) create mode 100644 t/03-pubsub.t diff --git a/Makefile.PL b/Makefile.PL index 94fb37f..360f476 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -11,6 +11,7 @@ WriteMakefile( PREREQ_PM => { 'Test::More' => 0.92, 'Test::Exception' => 0, + 'Test::Deep' => 0, 'IO::Socket::INET' => 0, 'Data::Dumper' => 0, 'Carp' => 0, diff --git a/lib/Redis.pm b/lib/Redis.pm index 11ba2cb..3829007 100644 --- a/lib/Redis.pm +++ b/lib/Redis.pm @@ -4,6 +4,7 @@ use warnings; use strict; use IO::Socket::INET; +use IO::Select; use Fcntl qw( O_NONBLOCK F_SETFL ); use Data::Dumper; use Carp qw/confess/; @@ -59,10 +60,13 @@ sub new { $self->{rbuf} = ''; $self->{is_subscriber} = 0; + $self->{subscribers} = {}; return bless($self, $class); } +sub is_subscriber { $_[0]{is_subscriber} } + ### we don't want DESTROY to fallback into AUTOLOAD sub DESTROY { } @@ -81,8 +85,26 @@ sub AUTOLOAD { $command =~ s/.*://; $self->__is_valid_command($command); - $self->__send_command($command, @_); + ## PubSub commands use a different answer handling + if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) { + $pr = '' unless $pr; + + my $cb = pop; + confess("Missing required callback in call to $command(), ") + unless ref($cb) eq 'CODE'; + + my @subs = @_; + @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs) + if $unsub; + return unless @subs; + + $self->__send_command($command, @subs); + my %cbs = map { ("${pr}message:$_" => $cb) } @subs; + return $self->__process_subscription_changes($command, \%cbs); + } + + $self->__send_command($command, @_); return $self->__read_response($command); } @@ -126,6 +148,86 @@ sub keys { } +### PubSub +sub wait_for_messages { + my ($self, $timeout) = @_; + + my $s = IO::Select->new; + $s->add($self->{sock}); + + my $count = 0; + while ($s->can_read($timeout)) { + while ($self->__can_read_sock) { + my @m = $self->__read_response('WAIT_FOR_MESSAGES'); + $self->__process_pubsub_msg(\@m); + $count++; + } + } + + return $count; +} + +sub __process_unsubscribe_requests { + my ($self, $cb, $pr, @unsubs) = @_; + my $subs = $self->{subscribers}; + + my @subs_to_unsubscribe; + for my $sub (@unsubs) { + my $key = "${pr}message:$sub"; + my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}]; + next if @$cbs; + + delete $subs->{$key}; + push @subs_to_unsubscribe, $sub; + } + + return @subs_to_unsubscribe; +} + +sub __process_subscription_changes { + my ($self, $cmd, $expected) = @_; + my $subs = $self->{subscribers}; + + while (%$expected) { + my @m = $self->__read_response($cmd); + + ## Deal with pending PUBLISH'ed messages + if ($m[0] =~ /^p?message$/) { + $self->__process_pubsub_msg(\@m); + next; + } + + my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/; + $key .= "message:$m[1]"; + my $cb = delete $expected->{$key}; + + push @{$subs->{$key}}, $cb unless $unsub; + + $self->{is_subscriber} = $m[2]; + } +} + +sub __process_pubsub_msg { + my ($self, $m) = @_; + my $subs = $self->{subscribers}; + + my $sub = $m->[1]; + my $cbid = "$m->[0]:$sub"; + my $data = pop @$m; + my $topic = $m->[2] || $sub; + + if (!exists $subs->{$cbid}) { + warn "Message for topic '$topic' ($cbid) without expected callback, "; + return; + } + + $_->($data, $topic, $sub) for @{$subs->{$cbid}}; + + return 1; + +} + + ### Mode validation sub __is_valid_command { my ($self, $cmd) = @_; diff --git a/t/03-pubsub.t b/t/03-pubsub.t new file mode 100644 index 0000000..a0b63b9 --- /dev/null +++ b/t/03-pubsub.t @@ -0,0 +1,118 @@ +#!perl + +use strict; +use warnings; +use Test::More; +use Test::Deep; +use Test::Exception; +use Redis; + +my %got; +my $pub = Redis->new(); +my $sub = Redis->new(); + +is($pub->publish('aa', 'v1'), 0, "No subscribers to 'aa' topic"); + +## Basic pubsub +my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" }; +$sub->subscribe('aa', 'bb', $sub_cb); +is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'"); + +is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); +cmp_deeply(\%got, {'aa' => 'v1:aa'}, "... for the expected topic, 'aa'"); + +my $sub_cb2 = sub { my ($v, $t, $s) = @_; $got{"2$s"} = uc("$v:$t") }; +$sub->subscribe('aa', $sub_cb2); +is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'"); + +is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); +cmp_deeply( + \%got, + {'aa' => 'v1:aa', '2aa' => 'V1:AA'}, + "... for the expected topic, 'aa', with two handlers" +); + + +## Trick subscribe with other messages +my $psub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" }; +%got = (); +is($pub->publish('aa', 'v2'), 1, "Delivered to 1 subscriber of topic 'aa'"); +$sub->psubscribe('a*', 'c*', $psub_cb); +cmp_deeply( + \%got, + {'aa' => 'v2:aa', '2aa' => 'V2:AA'}, + '... received message while processing psubscribe(), two handlers' +); + +is($pub->publish('aa', 'v3'), 2, "Delivered to 2 subscriber of topic 'aa'"); +is($sub->wait_for_messages(1), 2, '... yep, got the expected 2 messages'); +cmp_deeply( + \%got, + {'aa' => 'v3:aa', 'a*' => 'v3:aa', '2aa' => 'V3:AA'}, + "... for the expected subs, 'aa' and 'a*', three handlers total" +); + +## Test subscribe/psubscribe diffs +%got = (); +is($pub->publish('aaa', 'v4'), 1, "Delivered to 1 subscriber of topic 'aaa'"); +is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); +cmp_deeply(\%got, {'a*' => 'v4:aaa'}, "... for the expected sub, 'a*'"); + + +## Subscriber mode status +is($sub->is_subscriber, 4, 'Current subscriber has 4 subscriptions active'); +is($pub->is_subscriber, 0, '... the publisher has none'); + + +## Unsubscribe +$sub->unsubscribe('xx', sub { }); +is($sub->is_subscriber, 4, + "No match to our subscriptions, unsubscribe doesn't change active count"); + +$sub->unsubscribe('aa', $sub_cb); +is($sub->is_subscriber, 4, + "unsubscribe ok, active count is still 4, another handler is alive"); + +$sub->unsubscribe('aa', $sub_cb2); +is($sub->is_subscriber, 3, + "unsubscribe done, active count is now 3, both handlers are done"); + +$pub->publish('aa', 'v5'); +%got = (); +is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); +cmp_deeply(\%got, {'a*', 'v5:aa'}, "... for the expected key, 'a*'"); + +$sub->unsubscribe('a*', $psub_cb); +is($sub->is_subscriber, 3, + "unsubscribe with topic wildcard failed, active count is now 3"); + +$pub->publish('aa', 'v6'); +%got = (); +is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message'); +cmp_deeply(\%got, {'a*', 'v6:aa'}, "... for the expected key, 'a*'"); + +$sub->unsubscribe('bb', $sub_cb); +is($sub->is_subscriber, 2, "unsubscribe with 'bb' ok, active count is now 2"); + +$sub->punsubscribe('a*', $psub_cb); +is($sub->is_subscriber, 1, + "punsubscribe with 'a*' ok, active count is now 1"); + +is($pub->publish('aa', 'v6'), 0, "Publish to 'aa' now gives 0 deliveries"); +%got = (); +is($sub->wait_for_messages(1), 0, '... yep, no messages delivered'); +cmp_deeply(\%got, {}, '... and an empty messages recorded set'); + +is($sub->is_subscriber, 1, 'Still some pending subcriptions active'); +throws_ok sub { $sub->info }, + qr/Cannot use command 'INFO' while in SUBSCRIBE mode/, + '... still an error to try commands in subscribe mode'; +$sub->punsubscribe('c*', $psub_cb); +is($sub->is_subscriber, 0, '... but none anymore'); + +lives_ok sub { $sub->info }, + 'Other commands ok after we leave subscriber_mode'; + + +## And we are done +done_testing(); -- 2.20.1