}
sub _read_multi_bulk {
- my ($self,$size) = @_;
+ my ($self, $size) = @_;
return if $size < 0;
sub subscribe {
my ($self, %args) = @_;
- return $self->_subscribe('subscribe', %args);
+
+ $self->_send_command('subscribe', @{ $args{channels} });
+
+ return $self->_psubscribe(
+ pmessage => $args{message} || sub {},
+ psubscribe => $args{subscribe} || sub {},
+ );
}
sub psubscribe {
my ($self, %args) = @_;
- return $self->_subscribe('psubscribe', %args);
+
+ $self->_send_command('psubscribe', @{ $args{patterns} });
+ return $self->_psubscribe(%args);
}
-sub _subscribe {
- my ($self, $command, %args) = @_;
+sub _psubscribe {
+ my ($self, %args) = @_;
- $args{consume} ||= sub {};
- $args{subscribe} ||= sub {};
- $args{unsubscribe} ||= sub {};
-
- $self->_send_command($command, @{ $args{channels} });
+ $args{pmessage} ||= sub {};
+ $args{psubscribe} ||= sub {};
$self->{subscribed} = 1;
# Setup handler called right after the client subscribes to channels
- $args{subscribe}->( $self, $self->_read_results() );
+ $args{psubscribe}->( $self, $self->_read_results() );
while ($self->{subscribed}) {
# Handles each message consumed from the channel
- $args{consume}->( $self, $self->_read_results() );
+ my @results = $self->_read_results();
+ $args{pmessage}->( $self, splice @results, 1 );
}
- # Teardown handler called right after the client unsubscribes from the last channel
- return $args{unsubscribe}->($self);
+ return 1;
}
sub info {
#!/usr/bin/perl
-
use warnings;
use strict;
-
-use Test::More tests => 20;
+use Test::More tests => 12;
use lib 'lib';
BEGIN {
use_ok( 'Redis' );
}
-diag "Publish / subscribe commands";
-
-ok( my $p = Redis->new(), 'publisher' );
-ok( my $s = Redis->new(), 'subscriber' );
+ok my $p = Redis->new(), 'publisher';
+ok my $s = Redis->new(), 'subscriber';
-cmp_ok $s->subscribe(
+ok $s->subscribe(
channels => ['nonexistent'],
subscribe => sub {
- cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 0, '0 clients received published message' );
- is_deeply( [ $s->unsubscribe( 'nonexistent' ) ], [ 'unsubscribe', 'nonexistent', 0 ] );
+ $p->publish('channel-1', 'test message');
+ $s->unsubscribe('nonexistent');
},
- consume => sub { die "Shouldn't receive messages on nonexistent channel" },
- unsubscribe => sub { 123 },
-), '==', 123;
+ message => sub { die "Shouldn't receive messages on nonexistent channel" },
+);
-cmp_ok $s->subscribe(
+$s->subscribe(
channels => ['channel-1'],
subscribe => sub {
- cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 client received published message' );
+ $p->publish('channel-1', 'test message');
},
- consume => sub {
- my ($self, @result) = @_;
+ message => sub {
+ my ($self, $channel, $message) = @_;
+
+ is $message, 'test message';
+ is $channel, 'channel-1';
- is_deeply( [ @result ], ['message', 'channel-1', 'test message'] );
- is_deeply( [ $s->unsubscribe( 'channel-1' ) ], [ 'unsubscribe', 'channel-1', 0 ] );
+ $s->unsubscribe('channel-1');
},
- unsubscribe => sub { 234 },
-), '==', 234;
+);
my @consumers = (
sub {
- my ($self, @result) = @_;
- is_deeply( [ @result ], ['pmessage', 'channel-*', 'channel-1', 'test message'] )
+ my ($self, $pattern, $channel, $message) = @_;
+
+ is $message, 'test message 1';
+ is $pattern, 'channel-*';
+ is $channel, 'channel-1';
},
sub {
- my ($self, @result) = @_;
- is_deeply( [ @result ], ['pmessage', 'channel-*', 'channel-2', 'test message'] );
- is_deeply( [ $s->punsubscribe( 'channel-*' ) ], [ 'punsubscribe', 'channel-*', 0 ] );
+ my ($self, $pattern, $channel, $message) = @_;
+
+ is $message, 'test message 2';
+ is $pattern, 'channel-*';
+ is $channel, 'channel-2';
+
+ $s->punsubscribe('channel-*');
}
);
-cmp_ok $s->psubscribe(
- channels => ['channel-*'],
- subscribe => sub {
- cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 clients received published message' );
- cmp_ok( $p->publish( 'channel-2', 'test message' ), '==', 1, '1 clients received published message' );
- cmp_ok( $p->publish( 'nonexistent', 'test message' ), '==', 0, '0 clients received published message' );
+$s->psubscribe(
+ patterns => ['channel-*'],
+ psubscribe => sub {
+ $p->publish('channel-1', 'test message 1');
+ $p->publish('channel-2', 'test message 2');
+ $p->publish('nonexistent', 'test message 3');
},
- consume => sub {
+ pmessage => sub {
my ($self, @result) = @_;
my $consumer = shift @consumers;
$consumer->($self, @result);
},
- unsubscribe => sub { 345 },
-), '==', 345;
+);
-cmp_ok $s->psubscribe(
- channels => ['nomatch.*'],
- subscribe => sub {
- cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 0, '0 clients received published message' );
- is_deeply( [ $s->punsubscribe( 'nomatch.*' ) ], [ 'punsubscribe', 'nomatch.*', 0 ] );
+$s->psubscribe(
+ patterns => ['nomatch.*'],
+ psubscribe => sub {
+ $p->publish('channel-1', 'test message');
+ $s->punsubscribe('nomatch.*');
},
- consume => sub { die "Shouldn't receive messages on nonexistent channel" },
- unsubscribe => sub { 456 },
-), '==', 456;
+ pmessage => sub { die "Shouldn't receive messages on nonexistent channel but got @_" },
+);