use strict;
use IO::Socket::INET;
+use IO::Select;
+use Fcntl qw( O_NONBLOCK F_SETFL );
use Data::Dumper;
use Carp qw/confess/;
use Encode;
=cut
sub new {
- my $class = shift;
- my $self = {@_};
-
- $self->{debug} ||= $ENV{REDIS_DEBUG};
- $self->{encoding} ||= 'utf8'; ## default to lax utf8
-
- $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
- $self->{sock} = IO::Socket::INET->new(
- PeerAddr => $self->{server},
- Proto => 'tcp',
- ) || confess("Could not connect to Redis server at $self->{server}: $!");
- $self->{rbuf} = '';
-
- $self->{is_subscriber} = 0;
-
- return bless($self, $class);
+ my $class = shift;
+ my $self = {@_};
+
+ $self->{debug} ||= $ENV{REDIS_DEBUG};
+ $self->{encoding} ||= 'utf8'; ## default to lax utf8
+
+ $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
+ $self->{sock} = IO::Socket::INET->new(
+ PeerAddr => $self->{server},
+ Proto => 'tcp',
+ ) || confess("Could not connect to Redis server at $self->{server}: $!");
+
+ $self->{read_size} = 8192;
+ $self->{rbuf} = '';
+
+ $self->{is_subscriber} = 0;
+ $self->{subscribers} = {};
+
+ return bless($self, $class);
}
+sub is_subscriber { $_[0]{is_subscriber} }
+
### we don't want DESTROY to fallback into AUTOLOAD
-sub DESTROY {}
+sub DESTROY { }
### Deal with common, general case, Redis commands
our $AUTOLOAD;
+
sub AUTOLOAD {
- my $self = shift;
- my $sock = $self->{sock} || confess("Not connected to any server");
- my $enc = $self->{encoding};
- my $deb = $self->{debug};
+ my $self = shift;
+ my $sock = $self->{sock} || confess("Not connected to any server");
+ my $enc = $self->{encoding};
+ my $deb = $self->{debug};
+
+ my $command = $AUTOLOAD;
+ $command =~ s/.*://;
+ $self->__is_valid_command($command);
- my $command = $AUTOLOAD;
- $command =~ s/.*://;
- $self->__is_valid_command($command);
+ ## PubSub commands use a different answer handling
+ if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
+ $pr = '' unless $pr;
- $self->__send_command($command, @_);
+ my $cb = pop;
+ confess("Missing required callback in call to $command(), ")
+ unless ref($cb) eq 'CODE';
+ my @subs = @_;
+ @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
+ if $unsub;
+ return unless @subs;
+
+ $self->__send_command($command, @subs);
+
+ my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
+ return $self->__process_subscription_changes($command, \%cbs);
+ }
+
+ $self->__send_command($command, @_);
return $self->__read_response($command);
}
$self->__send_command('QUIT');
- close(delete $self->{sock}) || confess("Can't close socket: $!");
delete $self->{rbuf};
+ close(delete $self->{sock}) || confess("Can't close socket: $!");
return 1;
}
my $info = $self->__read_response('INFO');
- return {
- map { split(/:/, $_, 2) } split(/\r\n/, $info)
- };
+ return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
}
sub keys {
$self->__send_command('KEYS', @_);
- my @keys = $self->__read_response('INFO', \my $type);
+ my @keys = $self->__read_response('KEYS', \my $type);
+ ## Support redis > 1.26
return @keys if $type eq '*';
## Support redis <= 1.2.6
}
+### PubSub
+sub wait_for_messages {
+ my ($self, $timeout) = @_;
+
+ my $s = IO::Select->new;
+ $s->add($self->{sock});
+
+ my $count = 0;
+ while ($s->can_read($timeout)) {
+ while ($self->__can_read_sock) {
+ my @m = $self->__read_response('WAIT_FOR_MESSAGES');
+ $self->__process_pubsub_msg(\@m);
+ $count++;
+ }
+ }
+
+ return $count;
+}
+
+sub __process_unsubscribe_requests {
+ my ($self, $cb, $pr, @unsubs) = @_;
+ my $subs = $self->{subscribers};
+
+ my @subs_to_unsubscribe;
+ for my $sub (@unsubs) {
+ my $key = "${pr}message:$sub";
+ my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}];
+ next if @$cbs;
+
+ delete $subs->{$key};
+ push @subs_to_unsubscribe, $sub;
+ }
+
+ return @subs_to_unsubscribe;
+}
+
+sub __process_subscription_changes {
+ my ($self, $cmd, $expected) = @_;
+ my $subs = $self->{subscribers};
+
+ while (%$expected) {
+ my @m = $self->__read_response($cmd);
+
+ ## Deal with pending PUBLISH'ed messages
+ if ($m[0] =~ /^p?message$/) {
+ $self->__process_pubsub_msg(\@m);
+ next;
+ }
+
+ my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
+ $key .= "message:$m[1]";
+ my $cb = delete $expected->{$key};
+
+ push @{$subs->{$key}}, $cb unless $unsub;
+
+ $self->{is_subscriber} = $m[2];
+ }
+}
+
+sub __process_pubsub_msg {
+ my ($self, $m) = @_;
+ my $subs = $self->{subscribers};
+
+ my $sub = $m->[1];
+ my $cbid = "$m->[0]:$sub";
+ my $data = pop @$m;
+ my $topic = $m->[2] || $sub;
+
+ if (!exists $subs->{$cbid}) {
+ warn "Message for topic '$topic' ($cbid) without expected callback, ";
+ return;
+ }
+
+ $_->($data, $topic, $sub) for @{$subs->{$cbid}};
+
+ return 1;
+
+}
+
+
### Mode validation
sub __is_valid_command {
my ($self, $cmd) = @_;
return unless $self->{is_subscriber};
- return if $cmd =~ /^P?(UN)?SUBSCRIBE$/;
+ return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i;
confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ");
}
return $result;
}
elsif ($type eq '$') {
- return if $result < 0;
- return $self->__read_sock($result);
+ return if $result < 0;
+ return $self->__read_sock($result);
}
elsif ($type eq '*') {
- my @list;
- while ($result--) {
- push @list, $self->__read_response($command);
- }
+ my @list;
+ while ($result--) {
+ push @list, $self->__read_response($command);
+ }
return @list;
}
elsif ($type eq ':') {
return $result;
}
else {
- confess "unknown answer type: $type ($result), "
+ confess "unknown answer type: $type ($result), ";
}
}
my $rbuf = \($self->{rbuf});
my ($data, $type) = ('', '');
- my $read_size = defined $len? $len+2 : 8192;
+ my $read_size = $self->{read_size};
+ $read_size = $len + 2 if defined $len && $len + 2 > $read_size;
+
while (1) {
## Read NN bytes, strip \r\n at the end
if (defined $len) {
}
my $bytes = sysread $sock, $$rbuf, $read_size, length $$rbuf;
- confess("Error while reading from Redis server: $!") unless defined $bytes;
+ confess("Error while reading from Redis server: $!")
+ unless defined $bytes;
confess("Redis server closed connection") unless $bytes;
}
return $data;
}
+sub __can_read_sock {
+ my ($self) = @_;
+ my $sock = $self->{sock};
+ my $rbuf = \($self->{rbuf});
+
+ return 1 if $$rbuf;
+ __fh_nonblocking($sock, 1);
+ my $bytes = sysread $sock, $$rbuf, $self->{read_size}, length $$rbuf;
+ __fh_nonblocking($sock, 0);
+ return 1 if $bytes;
+ return 0;
+}
+
+
+### Copied from AnyEvent::Util
+BEGIN {
+ *__fh_nonblocking = ($^O eq 'MSWin32')
+ ? sub($$) { ioctl $_[0], 0x8004667e, pack "L", $_[1]; } # FIONBIO
+ : sub($$) { fcntl $_[0], F_SETFL, $_[1] ? O_NONBLOCK : 0; };
+}
+
1;