sub _new_sock {
my ($self) = @_;
- return IO::Socket::INET->new(
+ my $sock = IO::Socket::INET->new(
PeerAddr => $self->{server} || $ENV{REDIS_SERVER} || '127.0.0.1:6379',
Proto => 'tcp',
) or die $!;
+
+ return $sock;
}
sub _send {
return 1;
}
- if ($command eq 'unsubscribe') {
+ if ($command eq 'unsubscribe' || $command eq 'punsubscribe') {
my @results = $self->_read_results();
my $channels_subscribed = $results[-1];
sub subscribe {
my ($self, %args) = @_;
+ return $self->_subscribe('subscribe', %args);
+}
+
+sub psubscribe {
+ my ($self, %args) = @_;
+ return $self->_subscribe('psubscribe', %args);
+}
+
+sub _subscribe {
+ my ($self, $command, %args) = @_;
$args{consume} ||= sub {};
$args{subscribe} ||= sub {};
$args{unsubscribe} ||= sub {};
- $self->_send_command('subscribe', @{ $args{channels} });
+ $self->_send_command($command, @{ $args{channels} });
$self->{subscribed} = 1;
use warnings;
use strict;
-use Test::More tests => 7;
+use Test::More tests => 20;
use lib 'lib';
BEGIN {
ok( my $p = Redis->new(), 'publisher' );
ok( my $s = Redis->new(), 'subscriber' );
-my $consume = sub {
- my ($self, @result) = @_;
+cmp_ok $s->subscribe(
+ channels => ['nonexistent'],
+ subscribe => sub {
+ cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 0, '0 clients received published message' );
+ is_deeply( [ $s->unsubscribe( 'nonexistent' ) ], [ 'unsubscribe', 'nonexistent', 0 ] );
+ },
+ consume => sub { die "Shouldn't receive messages on nonexistent channel" },
+ unsubscribe => sub { 123 },
+), '==', 123;
- is_deeply( [ @result ], ['message', 'channel-1', 'test message'] );
- is_deeply( [ $self->unsubscribe( 'channel-1' ) ], [ 'unsubscribe', 'channel-1', 0 ] );
-};
+cmp_ok $s->subscribe(
+ channels => ['channel-1'],
+ subscribe => sub {
+ cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 client received published message' );
+ },
+ consume => sub {
+ my ($self, @result) = @_;
-my $subscribe = sub {
- cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 client received published message' );
-};
+ is_deeply( [ @result ], ['message', 'channel-1', 'test message'] );
+ is_deeply( [ $s->unsubscribe( 'channel-1' ) ], [ 'unsubscribe', 'channel-1', 0 ] );
+ },
+ unsubscribe => sub { 234 },
+), '==', 234;
-my $unsubscribe = sub {
- return 234;
-};
+my @consumers = (
+ sub {
+ my ($self, @result) = @_;
+ is_deeply( [ @result ], ['pmessage', 'channel-*', 'channel-1', 'test message'] )
+ },
+ sub {
+ my ($self, @result) = @_;
+ is_deeply( [ @result ], ['pmessage', 'channel-*', 'channel-2', 'test message'] );
+ is_deeply( [ $s->punsubscribe( 'channel-*' ) ], [ 'punsubscribe', 'channel-*', 0 ] );
+ }
+);
+
+cmp_ok $s->psubscribe(
+ channels => ['channel-*'],
+ subscribe => sub {
+ cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 clients received published message' );
+ cmp_ok( $p->publish( 'channel-2', 'test message' ), '==', 1, '1 clients received published message' );
+ cmp_ok( $p->publish( 'nonexistent', 'test message' ), '==', 0, '0 clients received published message' );
+ },
+ consume => sub {
+ my ($self, @result) = @_;
+ my $consumer = shift @consumers;
+ $consumer->($self, @result);
+ },
+ unsubscribe => sub { 345 },
+), '==', 345;
+
+cmp_ok $s->psubscribe(
+ channels => ['nomatch.*'],
+ subscribe => sub {
+ cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 0, '0 clients received published message' );
+ is_deeply( [ $s->punsubscribe( 'nomatch.*' ) ], [ 'punsubscribe', 'nomatch.*', 0 ] );
+ },
+ consume => sub { die "Shouldn't receive messages on nonexistent channel" },
+ unsubscribe => sub { 456 },
+), '==', 456;
-cmp_ok $s->subscribe( channels => ['channel-1'], subscribe => $subscribe, consume => $consume, unsubscribe => $unsubscribe ), '==', 234;