X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=lib%2FRedis.pm;h=3da5f28a88b72a980aafd4bd8675333b1ff3fab1;hb=4f81a9bb14918836809b9aac316e22e22dd2d2bc;hp=9fbbab8588464c2652d0617a991badd06aac9276;hpb=5845f18c90c17241e2fd904ed21800ea6e049613;p=perl-Redis.git diff --git a/lib/Redis.pm b/lib/Redis.pm index 9fbbab8..3da5f28 100644 --- a/lib/Redis.pm +++ b/lib/Redis.pm @@ -4,8 +4,10 @@ use warnings; use strict; use IO::Socket::INET; +use Fcntl qw( O_NONBLOCK F_SETFL ); use Data::Dumper; use Carp qw/confess/; +use Encode; =head1 NAME @@ -13,14 +15,14 @@ Redis - perl binding for Redis database =cut -our $VERSION = '0.0801'; +our $VERSION = '1.2001'; =head1 DESCRIPTION Pure perl bindings for L -This version support git version 0.08 or later of Redis available at +This version supports protocol 1.2 or later of Redis available at L @@ -41,142 +43,215 @@ with same peace of code with a little help of C. =cut sub new { - my $class = shift; - my $self = {@_}; - $self->{debug} ||= $ENV{REDIS_DEBUG}; + my $class = shift; + my $self = {@_}; - $self->{sock} = IO::Socket::INET->new( - PeerAddr => $self->{server} || $ENV{REDIS_SERVER} || '127.0.0.1:6379', - Proto => 'tcp', - ) || die $!; + $self->{debug} ||= $ENV{REDIS_DEBUG}; + $self->{encoding} ||= 'utf8'; ## default to lax utf8 - bless($self, $class); - $self; + $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379'; + $self->{sock} = IO::Socket::INET->new( + PeerAddr => $self->{server}, + Proto => 'tcp', + ) || confess("Could not connect to Redis server at $self->{server}: $!"); + $self->{rbuf} = ''; + + $self->{is_subscriber} = 0; + + return bless($self, $class); } -my $bulk_command = { - set => 1, setnx => 1, - rpush => 1, lpush => 1, - lset => 1, lrem => 1, - sadd => 1, srem => 1, - sismember => 1, - echo => 1, - getset => 1, - smove => 1, - zadd => 1, - zrem => 1, - zscore => 1, - zincrby => 1, - append => 1, -}; - -# we don't want DESTROY to fallback into AUTOLOAD -sub DESTROY {} +### we don't want DESTROY to fallback into AUTOLOAD +sub DESTROY { } + + +### Deal with common, general case, Redis commands our $AUTOLOAD; + sub AUTOLOAD { - my $self = shift; - - my $sock = $self->{sock} || die "no server connected"; - - my $command = $AUTOLOAD; - $command =~ s/.*://; - - warn "## $command ",Dumper(@_) if $self->{debug}; - - my $send; - - if ( defined $bulk_command->{$command} ) { - my $value = pop; - $value = '' if ! defined $value; - $send - = uc($command) - . ' ' - . join(' ', @_) - . ' ' - . length( $value ) - . "\r\n$value\r\n" - ; - } else { - $send - = uc($command) - . ' ' - . join(' ', @_) - . "\r\n" - ; - } - - warn ">> $send" if $self->{debug}; - print $sock $send; - - if ( $command eq 'quit' ) { - close( $sock ) || die "can't close socket: $!"; - return 1; - } - - my $result = <$sock> || die "can't read socket: $!"; - warn "<< $result" if $self->{debug}; - my $type = substr($result,0,1); - $result = substr($result,1,-2); - - if ( $command eq 'info' ) { - my $hash; - foreach my $l ( split(/\r\n/, $self->__read_bulk($result) ) ) { - my ($n,$v) = split(/:/, $l, 2); - $hash->{$n} = $v; - } - return $hash; - } elsif ( $command eq 'keys' ) { - my $keys = $self->__read_bulk($result); - return split(/\s/, $keys) if $keys; - return; - } - - if ( $type eq '-' ) { - confess "[$command] $result"; - } elsif ( $type eq '+' ) { - return $result; - } elsif ( $type eq '$' ) { - return $self->__read_bulk($result); - } elsif ( $type eq '*' ) { - return $self->__read_multi_bulk($result); - } elsif ( $type eq ':' ) { - return $result; # FIXME check if int? - } else { - confess "unknown type: $type", $self->__read_line(); - } + my $self = shift; + my $sock = $self->{sock} || confess("Not connected to any server"); + my $enc = $self->{encoding}; + my $deb = $self->{debug}; + + my $command = $AUTOLOAD; + $command =~ s/.*://; + $self->__is_valid_command($command); + + $self->__send_command($command, @_); + + return $self->__read_response($command); } -sub __read_bulk { - my ($self,$len) = @_; - return undef if $len < 0; - - my $v; - if ( $len > 0 ) { - read($self->{sock}, $v, $len) || die $!; - warn "<< ",Dumper($v),$/ if $self->{debug}; - } - my $crlf; - read($self->{sock}, $crlf, 2); # skip cr/lf - return $v; + +### Commands with extra logic +sub quit { + my ($self) = @_; + + $self->__send_command('QUIT'); + + close(delete $self->{sock}) || confess("Can't close socket: $!"); + delete $self->{rbuf}; + + return 1; } -sub __read_multi_bulk { - my ($self,$size) = @_; - return undef if $size < 0; - my $sock = $self->{sock}; +sub info { + my ($self) = @_; + $self->__is_valid_command('INFO'); - $size--; + $self->__send_command('INFO'); - my @list = ( 0 .. $size ); - foreach ( 0 .. $size ) { - $list[ $_ ] = $self->__read_bulk( substr(<$sock>,1,-2) ); - } + my $info = $self->__read_response('INFO'); - warn "## list = ", Dumper( @list ) if $self->{debug}; - return @list; + return {map { split(/:/, $_, 2) } split(/\r\n/, $info)}; } +sub keys { + my $self = shift; + $self->__is_valid_command('KEYS'); + + $self->__send_command('KEYS', @_); + + my @keys = $self->__read_response('INFO', \my $type); + return @keys if $type eq '*'; + + ## Support redis <= 1.2.6 + return split(/\s/, $keys[0]) if $keys[0]; + return; +} + + +### Mode validation +sub __is_valid_command { + my ($self, $cmd) = @_; + + return unless $self->{is_subscriber}; + return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i; + confess("Cannot use command '$cmd' while in SUBSCRIBE mode, "); +} + + +### Socket operations +sub __send_command { + my $self = shift; + my $cmd = uc(shift); + my $enc = $self->{encoding}; + my $deb = $self->{debug}; + + warn "[SEND] $cmd ", Dumper([@_]) if $deb; + + ## Encode command using multi-bulk format + my $n_elems = scalar(@_) + 1; + my $buf = "\*$n_elems\r\n"; + for my $elem ($cmd, @_) { + my $bin = $enc ? encode($enc, $elem) : $elem; + $buf .= defined($bin) ? '$' . length($bin) . "\r\n$bin\r\n" : "\$-1\r\n"; + } + + ## Send command, take care for partial writes + warn "[SEND RAW] $buf" if $deb; + my $sock = $self->{sock} || confess("Not connected to any server"); + while ($buf) { + my $len = syswrite $sock, $buf, length $buf; + confess("Could not write to Redis server: $!") + unless $len; + substr $buf, 0, $len, ""; + } + + return; +} + +sub __read_response { + my ($self, $command, $type_r) = @_; + + my ($type, $result) = $self->__read_sock; + $$type_r = $type if $type_r; + + if ($type eq '-') { + confess "[$command] $result, "; + } + elsif ($type eq '+') { + return $result; + } + elsif ($type eq '$') { + return if $result < 0; + return $self->__read_sock($result); + } + elsif ($type eq '*') { + my @list; + while ($result--) { + push @list, $self->__read_response($command); + } + return @list; + } + elsif ($type eq ':') { + return $result; + } + else { + confess "unknown answer type: $type ($result), "; + } +} + +sub __read_sock { + my ($self, $len) = @_; + my $sock = $self->{sock} || confess("Not connected to any server"); + my $enc = $self->{encoding}; + my $deb = $self->{debug}; + my $rbuf = \($self->{rbuf}); + + my ($data, $type) = ('', ''); + my $read_size = defined $len ? $len + 2 : 8192; + while (1) { + ## Read NN bytes, strip \r\n at the end + if (defined $len) { + if (length($$rbuf) >= $len + 2) { + $data = substr(substr($$rbuf, 0, $len + 2, ''), 0, -2); + last; + } + } + ## No len, means line more, read until \r\n + elsif ($$rbuf =~ s/^(.)([^\015\012]*)\015\012//) { + ($type, $data) = ($1, $2); + last; + } + + my $bytes = sysread $sock, $$rbuf, $read_size, length $$rbuf; + confess("Error while reading from Redis server: $!") + unless defined $bytes; + confess("Redis server closed connection") unless $bytes; + } + + $data = decode($enc, $data) if $enc; + warn "[RECV] '$type$data'" if $self->{debug}; + + return ($type, $data) if $type; + return $data; +} + +sub __can_read_sock { + my ($self) = @_; + my $sock = $self->{sock}; + my $rbuf = \($self->{rbuf}); + + return 1 if $$rbuf; + __fh_nonblocking($sock, 1); + my $bytes = sysread $sock, $$rbuf, 8192, length $$rbuf; + __fh_nonblocking($sock, 0); + return 1 if $bytes; + return 0; +} + + +### Copied from AnyEvent::Util +BEGIN { + *__fh_nonblocking = ($^O eq 'MSWin32') + ? sub($$) { ioctl $_[0], 0x8004667e, pack "L", $_[1]; } # FIONBIO + : sub($$) { fcntl $_[0], F_SETFL, $_[1] ? O_NONBLOCK : 0; }; +} + + 1; __END__ @@ -367,6 +442,14 @@ See also L for tie interface. my $info_hash = $r->info; +=head1 ENCODING + +Since Redis knows nothing about encoding, we are forcing utf-8 flag on all data received from Redis. +This change is introduced in 1.2001 version. + +This allows us to round-trip utf-8 encoded characters correctly, but might be problem if you push +binary junk into Redis and expect to get it back without utf-8 flag turned on. + =head1 AUTHOR Dobrica Pavlinusic, C<< >> @@ -417,7 +500,7 @@ L =head1 COPYRIGHT & LICENSE -Copyright 2009 Dobrica Pavlinusic, all rights reserved. +Copyright 2009-2010 Dobrica Pavlinusic, all rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.