use strict;
use IO::Socket::INET;
+use IO::Select;
use Fcntl qw( O_NONBLOCK F_SETFL );
use Data::Dumper;
use Carp qw/confess/;
$self->{rbuf} = '';
$self->{is_subscriber} = 0;
+ $self->{subscribers} = {};
return bless($self, $class);
}
+sub is_subscriber { $_[0]{is_subscriber} }
+
### we don't want DESTROY to fallback into AUTOLOAD
sub DESTROY { }
$command =~ s/.*://;
$self->__is_valid_command($command);
- $self->__send_command($command, @_);
+ ## PubSub commands use a different answer handling
+ if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
+ $pr = '' unless $pr;
+
+ my $cb = pop;
+ confess("Missing required callback in call to $command(), ")
+ unless ref($cb) eq 'CODE';
+
+ my @subs = @_;
+ @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
+ if $unsub;
+ return unless @subs;
+
+ $self->__send_command($command, @subs);
+ my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
+ return $self->__process_subscription_changes($command, \%cbs);
+ }
+
+ $self->__send_command($command, @_);
return $self->__read_response($command);
}
}
+### PubSub
+sub wait_for_messages {
+ my ($self, $timeout) = @_;
+
+ my $s = IO::Select->new;
+ $s->add($self->{sock});
+
+ my $count = 0;
+ while ($s->can_read($timeout)) {
+ while ($self->__can_read_sock) {
+ my @m = $self->__read_response('WAIT_FOR_MESSAGES');
+ $self->__process_pubsub_msg(\@m);
+ $count++;
+ }
+ }
+
+ return $count;
+}
+
+sub __process_unsubscribe_requests {
+ my ($self, $cb, $pr, @unsubs) = @_;
+ my $subs = $self->{subscribers};
+
+ my @subs_to_unsubscribe;
+ for my $sub (@unsubs) {
+ my $key = "${pr}message:$sub";
+ my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}];
+ next if @$cbs;
+
+ delete $subs->{$key};
+ push @subs_to_unsubscribe, $sub;
+ }
+
+ return @subs_to_unsubscribe;
+}
+
+sub __process_subscription_changes {
+ my ($self, $cmd, $expected) = @_;
+ my $subs = $self->{subscribers};
+
+ while (%$expected) {
+ my @m = $self->__read_response($cmd);
+
+ ## Deal with pending PUBLISH'ed messages
+ if ($m[0] =~ /^p?message$/) {
+ $self->__process_pubsub_msg(\@m);
+ next;
+ }
+
+ my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
+ $key .= "message:$m[1]";
+ my $cb = delete $expected->{$key};
+
+ push @{$subs->{$key}}, $cb unless $unsub;
+
+ $self->{is_subscriber} = $m[2];
+ }
+}
+
+sub __process_pubsub_msg {
+ my ($self, $m) = @_;
+ my $subs = $self->{subscribers};
+
+ my $sub = $m->[1];
+ my $cbid = "$m->[0]:$sub";
+ my $data = pop @$m;
+ my $topic = $m->[2] || $sub;
+
+ if (!exists $subs->{$cbid}) {
+ warn "Message for topic '$topic' ($cbid) without expected callback, ";
+ return;
+ }
+
+ $_->($data, $topic, $sub) for @{$subs->{$cbid}};
+
+ return 1;
+
+}
+
+
### Mode validation
sub __is_valid_command {
my ($self, $cmd) = @_;
--- /dev/null
+#!perl
+
+use strict;
+use warnings;
+use Test::More;
+use Test::Deep;
+use Test::Exception;
+use Redis;
+
+my %got;
+my $pub = Redis->new();
+my $sub = Redis->new();
+
+is($pub->publish('aa', 'v1'), 0, "No subscribers to 'aa' topic");
+
+## Basic pubsub
+my $sub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
+$sub->subscribe('aa', 'bb', $sub_cb);
+is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'");
+
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(\%got, {'aa' => 'v1:aa'}, "... for the expected topic, 'aa'");
+
+my $sub_cb2 = sub { my ($v, $t, $s) = @_; $got{"2$s"} = uc("$v:$t") };
+$sub->subscribe('aa', $sub_cb2);
+is($pub->publish('aa', 'v1'), 1, "Delivered to 1 subscriber of topic 'aa'");
+
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(
+ \%got,
+ {'aa' => 'v1:aa', '2aa' => 'V1:AA'},
+ "... for the expected topic, 'aa', with two handlers"
+);
+
+
+## Trick subscribe with other messages
+my $psub_cb = sub { my ($v, $t, $s) = @_; $got{$s} = "$v:$t" };
+%got = ();
+is($pub->publish('aa', 'v2'), 1, "Delivered to 1 subscriber of topic 'aa'");
+$sub->psubscribe('a*', 'c*', $psub_cb);
+cmp_deeply(
+ \%got,
+ {'aa' => 'v2:aa', '2aa' => 'V2:AA'},
+ '... received message while processing psubscribe(), two handlers'
+);
+
+is($pub->publish('aa', 'v3'), 2, "Delivered to 2 subscriber of topic 'aa'");
+is($sub->wait_for_messages(1), 2, '... yep, got the expected 2 messages');
+cmp_deeply(
+ \%got,
+ {'aa' => 'v3:aa', 'a*' => 'v3:aa', '2aa' => 'V3:AA'},
+ "... for the expected subs, 'aa' and 'a*', three handlers total"
+);
+
+## Test subscribe/psubscribe diffs
+%got = ();
+is($pub->publish('aaa', 'v4'), 1, "Delivered to 1 subscriber of topic 'aaa'");
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(\%got, {'a*' => 'v4:aaa'}, "... for the expected sub, 'a*'");
+
+
+## Subscriber mode status
+is($sub->is_subscriber, 4, 'Current subscriber has 4 subscriptions active');
+is($pub->is_subscriber, 0, '... the publisher has none');
+
+
+## Unsubscribe
+$sub->unsubscribe('xx', sub { });
+is($sub->is_subscriber, 4,
+ "No match to our subscriptions, unsubscribe doesn't change active count");
+
+$sub->unsubscribe('aa', $sub_cb);
+is($sub->is_subscriber, 4,
+ "unsubscribe ok, active count is still 4, another handler is alive");
+
+$sub->unsubscribe('aa', $sub_cb2);
+is($sub->is_subscriber, 3,
+ "unsubscribe done, active count is now 3, both handlers are done");
+
+$pub->publish('aa', 'v5');
+%got = ();
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(\%got, {'a*', 'v5:aa'}, "... for the expected key, 'a*'");
+
+$sub->unsubscribe('a*', $psub_cb);
+is($sub->is_subscriber, 3,
+ "unsubscribe with topic wildcard failed, active count is now 3");
+
+$pub->publish('aa', 'v6');
+%got = ();
+is($sub->wait_for_messages(1), 1, '... yep, got the expected 1 message');
+cmp_deeply(\%got, {'a*', 'v6:aa'}, "... for the expected key, 'a*'");
+
+$sub->unsubscribe('bb', $sub_cb);
+is($sub->is_subscriber, 2, "unsubscribe with 'bb' ok, active count is now 2");
+
+$sub->punsubscribe('a*', $psub_cb);
+is($sub->is_subscriber, 1,
+ "punsubscribe with 'a*' ok, active count is now 1");
+
+is($pub->publish('aa', 'v6'), 0, "Publish to 'aa' now gives 0 deliveries");
+%got = ();
+is($sub->wait_for_messages(1), 0, '... yep, no messages delivered');
+cmp_deeply(\%got, {}, '... and an empty messages recorded set');
+
+is($sub->is_subscriber, 1, 'Still some pending subcriptions active');
+throws_ok sub { $sub->info },
+ qr/Cannot use command 'INFO' while in SUBSCRIBE mode/,
+ '... still an error to try commands in subscribe mode';
+$sub->punsubscribe('c*', $psub_cb);
+is($sub->is_subscriber, 0, '... but none anymore');
+
+lives_ok sub { $sub->info },
+ 'Other commands ok after we leave subscriber_mode';
+
+
+## And we are done
+done_testing();