Added support for Redis PubSub:
[perl-Redis.git] / lib / Redis.pm
index 151f7ce..3829007 100644 (file)
@@ -4,6 +4,8 @@ use warnings;
 use strict;
 
 use IO::Socket::INET;
+use IO::Select;
+use Fcntl qw( O_NONBLOCK F_SETFL );
 use Data::Dumper;
 use Carp qw/confess/;
 use Encode;
@@ -42,43 +44,67 @@ with same peace of code with a little help of C<AUTOLOAD>.
 =cut
 
 sub new {
-       my $class = shift;
-       my $self  = {@_};
-
-       $self->{debug} ||= $ENV{REDIS_DEBUG};
-       $self->{encoding} ||= 'utf8';    ## default to lax utf8
-
-       $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
-       $self->{sock} = IO::Socket::INET->new(
-               PeerAddr => $self->{server},
-               Proto => 'tcp',
-       ) || confess("Could not connect to Redis server at $self->{server}: $!");
-       $self->{rbuf} = '';
-       
-       $self->{is_subscriber} = 0;
-
-       return bless($self, $class);
+  my $class = shift;
+  my $self  = {@_};
+
+  $self->{debug} ||= $ENV{REDIS_DEBUG};
+  $self->{encoding} ||= 'utf8';    ## default to lax utf8
+
+  $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
+  $self->{sock} = IO::Socket::INET->new(
+    PeerAddr => $self->{server},
+    Proto    => 'tcp',
+  ) || confess("Could not connect to Redis server at $self->{server}: $!");
+
+  $self->{read_size} = 8192;
+  $self->{rbuf}      = '';
+
+  $self->{is_subscriber} = 0;
+  $self->{subscribers}   = {};
+
+  return bless($self, $class);
 }
 
+sub is_subscriber { $_[0]{is_subscriber} }
+
 
 ### we don't want DESTROY to fallback into AUTOLOAD
-sub DESTROY {}
+sub DESTROY { }
 
 
 ### Deal with common, general case, Redis commands
 our $AUTOLOAD;
+
 sub AUTOLOAD {
-       my $self = shift;
-       my $sock = $self->{sock} || confess("Not connected to any server");
-       my $enc = $self->{encoding};
-       my $deb = $self->{debug};
+  my $self = shift;
+  my $sock = $self->{sock} || confess("Not connected to any server");
+  my $enc  = $self->{encoding};
+  my $deb  = $self->{debug};
+
+  my $command = $AUTOLOAD;
+  $command =~ s/.*://;
+  $self->__is_valid_command($command);
 
-       my $command = $AUTOLOAD;
-       $command =~ s/.*://;
-       $self->__is_valid_command($command);
+  ## PubSub commands use a different answer handling
+  if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
+    $pr = '' unless $pr;
 
-       $self->__send_command($command, @_);
+    my $cb = pop;
+    confess("Missing required callback in call to $command(), ")
+      unless ref($cb) eq 'CODE';
 
+    my @subs = @_;
+    @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
+      if $unsub;
+    return unless @subs;
+
+    $self->__send_command($command, @subs);
+
+    my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
+    return $self->__process_subscription_changes($command, \%cbs);
+  }
+
+  $self->__send_command($command, @_);
   return $self->__read_response($command);
 }
 
@@ -89,8 +115,8 @@ sub quit {
 
   $self->__send_command('QUIT');
 
-  close(delete $self->{sock}) || confess("Can't close socket: $!");
   delete $self->{rbuf};
+  close(delete $self->{sock}) || confess("Can't close socket: $!");
 
   return 1;
 }
@@ -103,9 +129,7 @@ sub info {
 
   my $info = $self->__read_response('INFO');
 
-  return {
-    map { split(/:/, $_, 2) } split(/\r\n/, $info)
-  };
+  return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
 }
 
 sub keys {
@@ -114,7 +138,8 @@ sub keys {
 
   $self->__send_command('KEYS', @_);
 
-  my @keys = $self->__read_response('INFO', \my $type);
+  my @keys = $self->__read_response('KEYS', \my $type);
+  ## Support redis > 1.26
   return @keys if $type eq '*';
 
   ## Support redis <= 1.2.6
@@ -123,12 +148,92 @@ sub keys {
 }
 
 
+### PubSub
+sub wait_for_messages {
+  my ($self, $timeout) = @_;
+
+  my $s = IO::Select->new;
+  $s->add($self->{sock});
+
+  my $count = 0;
+  while ($s->can_read($timeout)) {
+    while ($self->__can_read_sock) {
+      my @m = $self->__read_response('WAIT_FOR_MESSAGES');
+      $self->__process_pubsub_msg(\@m);
+      $count++;
+    }
+  }
+
+  return $count;
+}
+
+sub __process_unsubscribe_requests {
+  my ($self, $cb, $pr, @unsubs) = @_;
+  my $subs = $self->{subscribers};
+
+  my @subs_to_unsubscribe;
+  for my $sub (@unsubs) {
+    my $key = "${pr}message:$sub";
+    my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}];
+    next if @$cbs;
+
+    delete $subs->{$key};
+    push @subs_to_unsubscribe, $sub;
+  }
+
+  return @subs_to_unsubscribe;
+}
+
+sub __process_subscription_changes {
+  my ($self, $cmd, $expected) = @_;
+  my $subs = $self->{subscribers};
+
+  while (%$expected) {
+    my @m = $self->__read_response($cmd);
+
+    ## Deal with pending PUBLISH'ed messages
+    if ($m[0] =~ /^p?message$/) {
+      $self->__process_pubsub_msg(\@m);
+      next;
+    }
+
+    my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
+    $key .= "message:$m[1]";
+    my $cb = delete $expected->{$key};
+
+    push @{$subs->{$key}}, $cb unless $unsub;
+
+    $self->{is_subscriber} = $m[2];
+  }
+}
+
+sub __process_pubsub_msg {
+  my ($self, $m) = @_;
+  my $subs = $self->{subscribers};
+
+  my $sub   = $m->[1];
+  my $cbid  = "$m->[0]:$sub";
+  my $data  = pop @$m;
+  my $topic = $m->[2] || $sub;
+
+  if (!exists $subs->{$cbid}) {
+    warn "Message for topic '$topic' ($cbid) without expected callback, ";
+    return;
+  }
+
+  $_->($data, $topic, $sub) for @{$subs->{$cbid}};
+
+  return 1;
+
+}
+
+
 ### Mode validation
 sub __is_valid_command {
   my ($self, $cmd) = @_;
 
   return unless $self->{is_subscriber};
-  return if $cmd =~ /^P?(UN)?SUBSCRIBE$/;
+  return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i;
   confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ");
 }
 
@@ -176,21 +281,21 @@ sub __read_response {
     return $result;
   }
   elsif ($type eq '$') {
-               return if $result < 0;
-               return $self->__read_sock($result);
+    return if $result < 0;
+    return $self->__read_sock($result);
   }
   elsif ($type eq '*') {
-         my @list;
-         while ($result--) {
-                 push @list, $self->__read_response($command);
-       }
+    my @list;
+    while ($result--) {
+      push @list, $self->__read_response($command);
+    }
     return @list;
   }
   elsif ($type eq ':') {
     return $result;
   }
   else {
-    confess "unknown answer type: $type ($result), "
+    confess "unknown answer type: $type ($result), ";
   }
 }
 
@@ -202,7 +307,9 @@ sub __read_sock {
   my $rbuf = \($self->{rbuf});
 
   my ($data, $type) = ('', '');
-  my $read_size = defined $len? $len+2 : 8192;
+  my $read_size = $self->{read_size};
+  $read_size = $len + 2 if defined $len && $len + 2 > $read_size;
+
   while (1) {
     ## Read NN bytes, strip \r\n at the end
     if (defined $len) {
@@ -218,7 +325,8 @@ sub __read_sock {
     }
 
     my $bytes = sysread $sock, $$rbuf, $read_size, length $$rbuf;
-    confess("Error while reading from Redis server: $!") unless defined $bytes;
+    confess("Error while reading from Redis server: $!")
+      unless defined $bytes;
     confess("Redis server closed connection") unless $bytes;
   }
 
@@ -229,6 +337,27 @@ sub __read_sock {
   return $data;
 }
 
+sub __can_read_sock {
+  my ($self) = @_;
+  my $sock   = $self->{sock};
+  my $rbuf   = \($self->{rbuf});
+
+  return 1 if $$rbuf;
+  __fh_nonblocking($sock, 1);
+  my $bytes = sysread $sock, $$rbuf, $self->{read_size}, length $$rbuf;
+  __fh_nonblocking($sock, 0);
+  return 1 if $bytes;
+  return 0;
+}
+
+
+### Copied from AnyEvent::Util
+BEGIN {
+  *__fh_nonblocking = ($^O eq 'MSWin32')
+    ? sub($$) { ioctl $_[0], 0x8004667e, pack "L", $_[1]; }    # FIONBIO
+    : sub($$) { fcntl $_[0], F_SETFL, $_[1] ? O_NONBLOCK : 0; };
+}
+
 
 1;