use strict;
use IO::Socket::INET;
+use Fcntl qw( O_NONBLOCK F_SETFL );
use Data::Dumper;
use Carp qw/confess/;
use Encode;
=cut
sub new {
- my $class = shift;
- my $self = {@_};
+ my $class = shift;
+ my $self = {@_};
- $self->{debug} ||= $ENV{REDIS_DEBUG};
- $self->{encoding} ||= 'utf8'; ## default to lax utf8
+ $self->{debug} ||= $ENV{REDIS_DEBUG};
+ $self->{encoding} ||= 'utf8'; ## default to lax utf8
- $self->{sock} = IO::Socket::INET->new(
- PeerAddr => $self->{server} || $ENV{REDIS_SERVER} || '127.0.0.1:6379',
- Proto => 'tcp',
- ) || die $!;
+ $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
+ $self->{sock} = IO::Socket::INET->new(
+ PeerAddr => $self->{server},
+ Proto => 'tcp',
+ ) || confess("Could not connect to Redis server at $self->{server}: $!");
- return bless($self, $class);
+ $self->{read_size} = 8192;
+ $self->{rbuf} = '';
+
+ $self->{is_subscriber} = 0;
+
+ return bless($self, $class);
}
-# we don't want DESTROY to fallback into AUTOLOAD
-sub DESTROY {}
+### we don't want DESTROY to fallback into AUTOLOAD
+sub DESTROY { }
+
+
+### Deal with common, general case, Redis commands
our $AUTOLOAD;
+
sub AUTOLOAD {
- my $self = shift;
- my $sock = $self->{sock} || die "no server connected";
- my $enc = $self->{encoding};
- my $deb = $self->{debug};
-
- my $command = $AUTOLOAD;
- $command =~ s/.*://;
- warn "[SEND] $command ",Dumper([@_]) if $deb;
-
- my $n_elems = scalar(@_)+1;
- my $send = "\*$n_elems\r\n";
- for my $str (uc($command), @_) {
- my $bin = $enc? encode($enc, $str) : $str;
- $send .= defined($bin)? '$'.length($bin)."\r\n$bin\r\n" : "\$-1\r\n";
- }
-
- warn "[SEND RAW] $send" if $deb;
- print $sock $send;
-
- if ( $command eq 'quit' ) {
- close( $sock ) || die "can't close socket: $!";
- return 1;
- }
-
- my $result = <$sock> || die "can't read socket: $!";
- my $type = substr($result,0,1);
- $result = substr($result,1,-2);
-
- $result = decode($enc, $result) if $enc;
- warn "[RECV] '$type$result'" if $deb;
-
- if ( $command eq 'info' ) {
- my $hash;
- foreach my $l ( split(/\r\n/, $self->__read_bulk($result) ) ) {
- my ($n,$v) = split(/:/, $l, 2);
- $hash->{$n} = $v;
- }
- return $hash;
- } elsif ( $command eq 'keys' ) {
- return $self->__read_multi_bulk($result)
- if $type eq '*';
- my $keys = $self->__read_bulk($result);
- return split(/\s/, $keys) if $keys;
- return;
- }
-
- if ( $type eq '-' ) {
- confess "[$command] $result";
- } elsif ( $type eq '+' ) {
- return $result;
- } elsif ( $type eq '$' ) {
- return $self->__read_bulk($result);
- } elsif ( $type eq '*' ) {
- return $self->__read_multi_bulk($result);
- } elsif ( $type eq ':' ) {
- return $result; # FIXME check if int?
- } else {
- confess "unknown type: $type", $self->__read_line();
- }
+ my $self = shift;
+ my $sock = $self->{sock} || confess("Not connected to any server");
+ my $enc = $self->{encoding};
+ my $deb = $self->{debug};
+
+ my $command = $AUTOLOAD;
+ $command =~ s/.*://;
+ $self->__is_valid_command($command);
+
+ $self->__send_command($command, @_);
+
+ return $self->__read_response($command);
}
-sub __read_bulk {
- my ($self,$len) = @_;
- return if $len < 0;
-
- my $enc = $self->{encoding};
- my $v = '';
- if ( $len > 0 ) {
- read($self->{sock}, $v, $len) || die $!;
- $v = decode($enc, $v) if $enc;
- }
- my $crlf;
- read($self->{sock}, $crlf, 2); # skip cr/lf
-
- warn "[PARSE] read_bulk ".Dumper($v) if $self->{debug};
- return $v;
+
+### Commands with extra logic
+sub quit {
+ my ($self) = @_;
+
+ $self->__send_command('QUIT');
+
+ close(delete $self->{sock}) || confess("Can't close socket: $!");
+ delete $self->{rbuf};
+
+ return 1;
}
-sub __read_multi_bulk {
- my ($self,$size) = @_;
- return if $size <= 0;
-
- my $sock = $self->{sock};
- my $deb = $self->{debug};
- my $enc = $self->{encoding};
- my @list;
- while ($size--) {
- my $v = $self->__read_bulk( substr(<$sock>,1,-2) );
- $v = decode($enc, $v) if $enc;
- warn " [PARSE] read_multi_bulk ($size) ".Dumper($v) if $deb;
- push @list, $v;
- }
-
- warn "[PARSE] multi_bulk ".Dumper( \@list ) if $deb;
- return @list;
+sub info {
+ my ($self) = @_;
+ $self->__is_valid_command('INFO');
+
+ $self->__send_command('INFO');
+
+ my $info = $self->__read_response('INFO');
+
+ return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
}
+sub keys {
+ my $self = shift;
+ $self->__is_valid_command('KEYS');
+
+ $self->__send_command('KEYS', @_);
+
+ my @keys = $self->__read_response('INFO', \my $type);
+ return @keys if $type eq '*';
+
+ ## Support redis <= 1.2.6
+ return split(/\s/, $keys[0]) if $keys[0];
+ return;
+}
+
+
+### Mode validation
+sub __is_valid_command {
+ my ($self, $cmd) = @_;
+
+ return unless $self->{is_subscriber};
+ return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i;
+ confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ");
+}
+
+
+### Socket operations
+sub __send_command {
+ my $self = shift;
+ my $cmd = uc(shift);
+ my $enc = $self->{encoding};
+ my $deb = $self->{debug};
+
+ warn "[SEND] $cmd ", Dumper([@_]) if $deb;
+
+ ## Encode command using multi-bulk format
+ my $n_elems = scalar(@_) + 1;
+ my $buf = "\*$n_elems\r\n";
+ for my $elem ($cmd, @_) {
+ my $bin = $enc ? encode($enc, $elem) : $elem;
+ $buf .= defined($bin) ? '$' . length($bin) . "\r\n$bin\r\n" : "\$-1\r\n";
+ }
+
+ ## Send command, take care for partial writes
+ warn "[SEND RAW] $buf" if $deb;
+ my $sock = $self->{sock} || confess("Not connected to any server");
+ while ($buf) {
+ my $len = syswrite $sock, $buf, length $buf;
+ confess("Could not write to Redis server: $!")
+ unless $len;
+ substr $buf, 0, $len, "";
+ }
+
+ return;
+}
+
+sub __read_response {
+ my ($self, $command, $type_r) = @_;
+
+ my ($type, $result) = $self->__read_sock;
+ $$type_r = $type if $type_r;
+
+ if ($type eq '-') {
+ confess "[$command] $result, ";
+ }
+ elsif ($type eq '+') {
+ return $result;
+ }
+ elsif ($type eq '$') {
+ return if $result < 0;
+ return $self->__read_sock($result);
+ }
+ elsif ($type eq '*') {
+ my @list;
+ while ($result--) {
+ push @list, $self->__read_response($command);
+ }
+ return @list;
+ }
+ elsif ($type eq ':') {
+ return $result;
+ }
+ else {
+ confess "unknown answer type: $type ($result), ";
+ }
+}
+
+sub __read_sock {
+ my ($self, $len) = @_;
+ my $sock = $self->{sock} || confess("Not connected to any server");
+ my $enc = $self->{encoding};
+ my $deb = $self->{debug};
+ my $rbuf = \($self->{rbuf});
+
+ my ($data, $type) = ('', '');
+ my $read_size = $self->{read_size};
+ $read_size = $len + 2 if defined $len && $len + 2 > $read_size;
+
+ while (1) {
+ ## Read NN bytes, strip \r\n at the end
+ if (defined $len) {
+ if (length($$rbuf) >= $len + 2) {
+ $data = substr(substr($$rbuf, 0, $len + 2, ''), 0, -2);
+ last;
+ }
+ }
+ ## No len, means line more, read until \r\n
+ elsif ($$rbuf =~ s/^(.)([^\015\012]*)\015\012//) {
+ ($type, $data) = ($1, $2);
+ last;
+ }
+
+ my $bytes = sysread $sock, $$rbuf, $read_size, length $$rbuf;
+ confess("Error while reading from Redis server: $!")
+ unless defined $bytes;
+ confess("Redis server closed connection") unless $bytes;
+ }
+
+ $data = decode($enc, $data) if $enc;
+ warn "[RECV] '$type$data'" if $self->{debug};
+
+ return ($type, $data) if $type;
+ return $data;
+}
+
+sub __can_read_sock {
+ my ($self) = @_;
+ my $sock = $self->{sock};
+ my $rbuf = \($self->{rbuf});
+
+ return 1 if $$rbuf;
+ __fh_nonblocking($sock, 1);
+ my $bytes = sysread $sock, $$rbuf, $self->{read_size}, length $$rbuf;
+ __fh_nonblocking($sock, 0);
+ return 1 if $bytes;
+ return 0;
+}
+
+
+### Copied from AnyEvent::Util
+BEGIN {
+ *__fh_nonblocking = ($^O eq 'MSWin32')
+ ? sub($$) { ioctl $_[0], 0x8004667e, pack "L", $_[1]; } # FIONBIO
+ : sub($$) { fcntl $_[0], F_SETFL, $_[1] ? O_NONBLOCK : 0; };
+}
+
+
1;
__END__