Fix: __is_valid_command() must deal with both upper and lower-case cmds
[perl-Redis.git] / lib / Redis.pm
index a764584..3da5f28 100644 (file)
@@ -4,6 +4,7 @@ use warnings;
 use strict;
 
 use IO::Socket::INET;
+use Fcntl qw( O_NONBLOCK F_SETFL );
 use Data::Dumper;
 use Carp qw/confess/;
 use Encode;
@@ -42,81 +43,97 @@ with same peace of code with a little help of C<AUTOLOAD>.
 =cut
 
 sub new {
-       my $class = shift;
-       my $self  = {@_};
+  my $class = shift;
+  my $self  = {@_};
 
-       $self->{debug} ||= $ENV{REDIS_DEBUG};
-       $self->{encoding} ||= 'utf8';    ## default to lax utf8
+  $self->{debug} ||= $ENV{REDIS_DEBUG};
+  $self->{encoding} ||= 'utf8';    ## default to lax utf8
 
-       $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
-       $self->{sock} = IO::Socket::INET->new(
-               PeerAddr => $self->{server},
-               Proto => 'tcp',
-       ) || confess("Could not connect to Redis server at $self->{server}: $!");
+  $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
+  $self->{sock} = IO::Socket::INET->new(
+    PeerAddr => $self->{server},
+    Proto    => 'tcp',
+  ) || confess("Could not connect to Redis server at $self->{server}: $!");
+  $self->{rbuf} = '';
 
-       return bless($self, $class);
+  $self->{is_subscriber} = 0;
+
+  return bless($self, $class);
 }
 
-# we don't want DESTROY to fallback into AUTOLOAD
-sub DESTROY {}
 
+### we don't want DESTROY to fallback into AUTOLOAD
+sub DESTROY { }
+
+
+### Deal with common, general case, Redis commands
 our $AUTOLOAD;
+
 sub AUTOLOAD {
-       my $self = shift;
-       my $sock = $self->{sock} || confess("Not connected to any server");
-       my $enc = $self->{encoding};
-       my $deb = $self->{debug};
-
-       my $command = $AUTOLOAD;
-       $command =~ s/.*://;
-
-       $self->__send_command($command, @_);
-
-       if ( $command eq 'quit' ) {
-               close( $sock ) || confess("Can't close socket: $!");
-               return 1;
-       }
-
-       my $result = <$sock> || confess("Can't read socket: $!");
-       my $type = substr($result,0,1);
-       $result = substr($result,1,-2);
-
-       $result = decode($enc, $result) if $enc;
-       warn "[RECV] '$type$result'" if $deb;
-
-       if ( $command eq 'info' ) {
-               my $hash;
-               foreach my $l ( split(/\r\n/, $self->__read_bulk($result) ) ) {
-                       my ($n,$v) = split(/:/, $l, 2);
-                       $hash->{$n} = $v;
-               }
-               return $hash;
-       } elsif ( $command eq 'keys' ) {
-               return $self->__read_multi_bulk($result)
-                       if $type eq '*';
-               my $keys = $self->__read_bulk($result);
-               return split(/\s/, $keys) if $keys;
-               return;
-       }
-
-       if ( $type eq '-' ) {
-               confess "[$command] $result";
-       } elsif ( $type eq '+' ) {
-               return $result;
-       } elsif ( $type eq '$' ) {
-               return $self->__read_bulk($result);
-       } elsif ( $type eq '*' ) {
-               return $self->__read_multi_bulk($result);
-       } elsif ( $type eq ':' ) {
-               return $result; # FIXME check if int?
-       } else {
-               confess "unknown type: $type", $self->__read_line();
-       }
+  my $self = shift;
+  my $sock = $self->{sock} || confess("Not connected to any server");
+  my $enc  = $self->{encoding};
+  my $deb  = $self->{debug};
+
+  my $command = $AUTOLOAD;
+  $command =~ s/.*://;
+  $self->__is_valid_command($command);
+
+  $self->__send_command($command, @_);
+
+  return $self->__read_response($command);
 }
 
 
-### Socket operations
+### Commands with extra logic
+sub quit {
+  my ($self) = @_;
+
+  $self->__send_command('QUIT');
+
+  close(delete $self->{sock}) || confess("Can't close socket: $!");
+  delete $self->{rbuf};
+
+  return 1;
+}
+
+sub info {
+  my ($self) = @_;
+  $self->__is_valid_command('INFO');
+
+  $self->__send_command('INFO');
+
+  my $info = $self->__read_response('INFO');
+
+  return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
+}
+
+sub keys {
+  my $self = shift;
+  $self->__is_valid_command('KEYS');
+
+  $self->__send_command('KEYS', @_);
+
+  my @keys = $self->__read_response('INFO', \my $type);
+  return @keys if $type eq '*';
+
+  ## Support redis <= 1.2.6
+  return split(/\s/, $keys[0]) if $keys[0];
+  return;
+}
+
 
+### Mode validation
+sub __is_valid_command {
+  my ($self, $cmd) = @_;
+
+  return unless $self->{is_subscriber};
+  return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i;
+  confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ");
+}
+
+
+### Socket operations
 sub __send_command {
   my $self = shift;
   my $cmd  = uc(shift);
@@ -146,42 +163,95 @@ sub __send_command {
   return;
 }
 
-sub __read_bulk {
-       my ($self,$len) = @_;
-       return if $len < 0;
-
-       my $enc = $self->{encoding};
-       my $v = '';
-       if ( $len > 0 ) {
-               read($self->{sock}, $v, $len) || confess("Could not read from sock: $!");
-               $v = decode($enc, $v) if $enc;
-       }
-       my $crlf;
-       read($self->{sock}, $crlf, 2); # skip cr/lf
-
-       warn "[PARSE] read_bulk ".Dumper($v) if $self->{debug};
-       return $v;
+sub __read_response {
+  my ($self, $command, $type_r) = @_;
+
+  my ($type, $result) = $self->__read_sock;
+  $$type_r = $type if $type_r;
+
+  if ($type eq '-') {
+    confess "[$command] $result, ";
+  }
+  elsif ($type eq '+') {
+    return $result;
+  }
+  elsif ($type eq '$') {
+    return if $result < 0;
+    return $self->__read_sock($result);
+  }
+  elsif ($type eq '*') {
+    my @list;
+    while ($result--) {
+      push @list, $self->__read_response($command);
+    }
+    return @list;
+  }
+  elsif ($type eq ':') {
+    return $result;
+  }
+  else {
+    confess "unknown answer type: $type ($result), ";
+  }
 }
 
-sub __read_multi_bulk {
-       my ($self,$size) = @_;
-       return if $size <= 0;
-
-       my $sock = $self->{sock};
-       my $deb = $self->{debug};
-       my $enc = $self->{encoding};
-  my @list;    
-       while ($size--) {
-               my $v = $self->__read_bulk( substr(<$sock>,1,-2) );
-               $v = decode($enc, $v) if $enc;
-               warn "  [PARSE] read_multi_bulk ($size) ".Dumper($v) if $deb;
-               push @list, $v;
-       }
-
-       warn "[PARSE] multi_bulk ".Dumper( \@list ) if $deb;
-       return @list;
+sub __read_sock {
+  my ($self, $len) = @_;
+  my $sock = $self->{sock} || confess("Not connected to any server");
+  my $enc  = $self->{encoding};
+  my $deb  = $self->{debug};
+  my $rbuf = \($self->{rbuf});
+
+  my ($data, $type) = ('', '');
+  my $read_size = defined $len ? $len + 2 : 8192;
+  while (1) {
+    ## Read NN bytes, strip \r\n at the end
+    if (defined $len) {
+      if (length($$rbuf) >= $len + 2) {
+        $data = substr(substr($$rbuf, 0, $len + 2, ''), 0, -2);
+        last;
+      }
+    }
+    ## No len, means line more, read until \r\n
+    elsif ($$rbuf =~ s/^(.)([^\015\012]*)\015\012//) {
+      ($type, $data) = ($1, $2);
+      last;
+    }
+
+    my $bytes = sysread $sock, $$rbuf, $read_size, length $$rbuf;
+    confess("Error while reading from Redis server: $!")
+      unless defined $bytes;
+    confess("Redis server closed connection") unless $bytes;
+  }
+
+  $data = decode($enc, $data) if $enc;
+  warn "[RECV] '$type$data'" if $self->{debug};
+
+  return ($type, $data) if $type;
+  return $data;
 }
 
+sub __can_read_sock {
+  my ($self) = @_;
+  my $sock   = $self->{sock};
+  my $rbuf   = \($self->{rbuf});
+
+  return 1 if $$rbuf;
+  __fh_nonblocking($sock, 1);
+  my $bytes = sysread $sock, $$rbuf, 8192, length $$rbuf;
+  __fh_nonblocking($sock, 0);
+  return 1 if $bytes;
+  return 0;
+}
+
+
+### Copied from AnyEvent::Util
+BEGIN {
+  *__fh_nonblocking = ($^O eq 'MSWin32')
+    ? sub($$) { ioctl $_[0], 0x8004667e, pack "L", $_[1]; }    # FIONBIO
+    : sub($$) { fcntl $_[0], F_SETFL, $_[1] ? O_NONBLOCK : 0; };
+}
+
+
 1;
 
 __END__