X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=lib%2FRedis.pm;h=3829007d9d4619f4d0a0a2b8b28e72b014f6551a;hb=56197d6df0962e44fdc7a2d5e4018c7900083087;hp=151f7ce011d349d2e82dbb317818f0c81063f6e8;hpb=f8b1b5abfab57c3b3d0e450ebd4e4d2a3cc90c0f;p=perl-Redis.git diff --git a/lib/Redis.pm b/lib/Redis.pm index 151f7ce..3829007 100644 --- a/lib/Redis.pm +++ b/lib/Redis.pm @@ -4,6 +4,8 @@ use warnings; use strict; use IO::Socket::INET; +use IO::Select; +use Fcntl qw( O_NONBLOCK F_SETFL ); use Data::Dumper; use Carp qw/confess/; use Encode; @@ -42,43 +44,67 @@ with same peace of code with a little help of C. =cut sub new { - my $class = shift; - my $self = {@_}; - - $self->{debug} ||= $ENV{REDIS_DEBUG}; - $self->{encoding} ||= 'utf8'; ## default to lax utf8 - - $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379'; - $self->{sock} = IO::Socket::INET->new( - PeerAddr => $self->{server}, - Proto => 'tcp', - ) || confess("Could not connect to Redis server at $self->{server}: $!"); - $self->{rbuf} = ''; - - $self->{is_subscriber} = 0; - - return bless($self, $class); + my $class = shift; + my $self = {@_}; + + $self->{debug} ||= $ENV{REDIS_DEBUG}; + $self->{encoding} ||= 'utf8'; ## default to lax utf8 + + $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379'; + $self->{sock} = IO::Socket::INET->new( + PeerAddr => $self->{server}, + Proto => 'tcp', + ) || confess("Could not connect to Redis server at $self->{server}: $!"); + + $self->{read_size} = 8192; + $self->{rbuf} = ''; + + $self->{is_subscriber} = 0; + $self->{subscribers} = {}; + + return bless($self, $class); } +sub is_subscriber { $_[0]{is_subscriber} } + ### we don't want DESTROY to fallback into AUTOLOAD -sub DESTROY {} +sub DESTROY { } ### Deal with common, general case, Redis commands our $AUTOLOAD; + sub AUTOLOAD { - my $self = shift; - my $sock = $self->{sock} || confess("Not connected to any server"); - my $enc = $self->{encoding}; - my $deb = $self->{debug}; + my $self = shift; + my $sock = $self->{sock} || confess("Not connected to any server"); + my $enc = $self->{encoding}; + my $deb = $self->{debug}; + + my $command = $AUTOLOAD; + $command =~ s/.*://; + $self->__is_valid_command($command); - my $command = $AUTOLOAD; - $command =~ s/.*://; - $self->__is_valid_command($command); + ## PubSub commands use a different answer handling + if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) { + $pr = '' unless $pr; - $self->__send_command($command, @_); + my $cb = pop; + confess("Missing required callback in call to $command(), ") + unless ref($cb) eq 'CODE'; + my @subs = @_; + @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs) + if $unsub; + return unless @subs; + + $self->__send_command($command, @subs); + + my %cbs = map { ("${pr}message:$_" => $cb) } @subs; + return $self->__process_subscription_changes($command, \%cbs); + } + + $self->__send_command($command, @_); return $self->__read_response($command); } @@ -89,8 +115,8 @@ sub quit { $self->__send_command('QUIT'); - close(delete $self->{sock}) || confess("Can't close socket: $!"); delete $self->{rbuf}; + close(delete $self->{sock}) || confess("Can't close socket: $!"); return 1; } @@ -103,9 +129,7 @@ sub info { my $info = $self->__read_response('INFO'); - return { - map { split(/:/, $_, 2) } split(/\r\n/, $info) - }; + return {map { split(/:/, $_, 2) } split(/\r\n/, $info)}; } sub keys { @@ -114,7 +138,8 @@ sub keys { $self->__send_command('KEYS', @_); - my @keys = $self->__read_response('INFO', \my $type); + my @keys = $self->__read_response('KEYS', \my $type); + ## Support redis > 1.26 return @keys if $type eq '*'; ## Support redis <= 1.2.6 @@ -123,12 +148,92 @@ sub keys { } +### PubSub +sub wait_for_messages { + my ($self, $timeout) = @_; + + my $s = IO::Select->new; + $s->add($self->{sock}); + + my $count = 0; + while ($s->can_read($timeout)) { + while ($self->__can_read_sock) { + my @m = $self->__read_response('WAIT_FOR_MESSAGES'); + $self->__process_pubsub_msg(\@m); + $count++; + } + } + + return $count; +} + +sub __process_unsubscribe_requests { + my ($self, $cb, $pr, @unsubs) = @_; + my $subs = $self->{subscribers}; + + my @subs_to_unsubscribe; + for my $sub (@unsubs) { + my $key = "${pr}message:$sub"; + my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}]; + next if @$cbs; + + delete $subs->{$key}; + push @subs_to_unsubscribe, $sub; + } + + return @subs_to_unsubscribe; +} + +sub __process_subscription_changes { + my ($self, $cmd, $expected) = @_; + my $subs = $self->{subscribers}; + + while (%$expected) { + my @m = $self->__read_response($cmd); + + ## Deal with pending PUBLISH'ed messages + if ($m[0] =~ /^p?message$/) { + $self->__process_pubsub_msg(\@m); + next; + } + + my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/; + $key .= "message:$m[1]"; + my $cb = delete $expected->{$key}; + + push @{$subs->{$key}}, $cb unless $unsub; + + $self->{is_subscriber} = $m[2]; + } +} + +sub __process_pubsub_msg { + my ($self, $m) = @_; + my $subs = $self->{subscribers}; + + my $sub = $m->[1]; + my $cbid = "$m->[0]:$sub"; + my $data = pop @$m; + my $topic = $m->[2] || $sub; + + if (!exists $subs->{$cbid}) { + warn "Message for topic '$topic' ($cbid) without expected callback, "; + return; + } + + $_->($data, $topic, $sub) for @{$subs->{$cbid}}; + + return 1; + +} + + ### Mode validation sub __is_valid_command { my ($self, $cmd) = @_; return unless $self->{is_subscriber}; - return if $cmd =~ /^P?(UN)?SUBSCRIBE$/; + return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i; confess("Cannot use command '$cmd' while in SUBSCRIBE mode, "); } @@ -176,21 +281,21 @@ sub __read_response { return $result; } elsif ($type eq '$') { - return if $result < 0; - return $self->__read_sock($result); + return if $result < 0; + return $self->__read_sock($result); } elsif ($type eq '*') { - my @list; - while ($result--) { - push @list, $self->__read_response($command); - } + my @list; + while ($result--) { + push @list, $self->__read_response($command); + } return @list; } elsif ($type eq ':') { return $result; } else { - confess "unknown answer type: $type ($result), " + confess "unknown answer type: $type ($result), "; } } @@ -202,7 +307,9 @@ sub __read_sock { my $rbuf = \($self->{rbuf}); my ($data, $type) = ('', ''); - my $read_size = defined $len? $len+2 : 8192; + my $read_size = $self->{read_size}; + $read_size = $len + 2 if defined $len && $len + 2 > $read_size; + while (1) { ## Read NN bytes, strip \r\n at the end if (defined $len) { @@ -218,7 +325,8 @@ sub __read_sock { } my $bytes = sysread $sock, $$rbuf, $read_size, length $$rbuf; - confess("Error while reading from Redis server: $!") unless defined $bytes; + confess("Error while reading from Redis server: $!") + unless defined $bytes; confess("Redis server closed connection") unless $bytes; } @@ -229,6 +337,27 @@ sub __read_sock { return $data; } +sub __can_read_sock { + my ($self) = @_; + my $sock = $self->{sock}; + my $rbuf = \($self->{rbuf}); + + return 1 if $$rbuf; + __fh_nonblocking($sock, 1); + my $bytes = sysread $sock, $$rbuf, $self->{read_size}, length $$rbuf; + __fh_nonblocking($sock, 0); + return 1 if $bytes; + return 0; +} + + +### Copied from AnyEvent::Util +BEGIN { + *__fh_nonblocking = ($^O eq 'MSWin32') + ? sub($$) { ioctl $_[0], 0x8004667e, pack "L", $_[1]; } # FIONBIO + : sub($$) { fcntl $_[0], F_SETFL, $_[1] ? O_NONBLOCK : 0; }; +} + 1;