package Redis;
-
use warnings;
use strict;
-
use IO::Socket::INET;
use Data::Dumper;
-use Carp qw/confess/;
+use Carp qw( confess );
use Encode;
+use bytes;
-=head1 NAME
-
-Redis - perl binding for Redis database
-
-=cut
+if ($ENV{REDIS_DEBUG}) {
+ require Redis::Debug;
+}
our $VERSION = '1.2001';
+our %BulkCommands = map { $_ => 1 } qw(
+ set setnx rpush lpush
+ lset lrem sadd srem
+ sismember echo getset smove
+ zadd zrem zscore zincrby
+ append publish
+);
-=head1 DESCRIPTION
-
-Pure perl bindings for L<http://code.google.com/p/redis/>
-
-This version supports protocol 1.2 or later of Redis available at
-
-L<git://github.com/antirez/redis>
-
-This documentation
-lists commands which are exercised in test suite, but
-additinal commands will work correctly since protocol
-specifies enough information to support almost all commands
-with same peace of code with a little help of C<AUTOLOAD>.
-
-=head1 FUNCTIONS
+sub new {
+ my ($class, %args) = @_;
-=head2 new
+ return bless \%args => $class;
+}
- my $r = Redis->new; # $ENV{REDIS_SERVER} or 127.0.0.1:6379
+sub _sock {
+ my ($self) = @_;
- my $r = Redis->new( server => '192.168.0.1:6379', debug = 0 );
+ return $self->{_sock} ||= $self->_new_sock();
+}
-=cut
+sub _new_sock {
+ my ($self) = @_;
+
+ return IO::Socket::INET->new(
+ PeerAddr => $self->{server} || $ENV{REDIS_SERVER} || '127.0.0.1:6379',
+ Proto => 'tcp',
+ ) or die $!;
+}
-sub new {
- my $class = shift;
- my $self = {@_};
- $self->{debug} ||= $ENV{REDIS_DEBUG};
+sub _send {
+ my ($self, @args) = @_;
+
+ my $message = join ' ', map { defined $_ ? $_ : '' } @args;
+ $self->__send($message);
+}
- $self->{sock} = IO::Socket::INET->new(
- PeerAddr => $self->{server} || $ENV{REDIS_SERVER} || '127.0.0.1:6379',
- Proto => 'tcp',
- ) || die $!;
+sub __send {
+ my ($self, $message) = @_;
+ $self->_sock->print($message);
+}
- bless($self, $class);
- $self;
+sub _getline {
+ my ($self) = @_;
+
+ my $line = $self->_sock->getline();
+ Encode::_utf8_on($line);
+
+ return $line;
}
-my $bulk_command = {
- set => 1, setnx => 1,
- rpush => 1, lpush => 1,
- lset => 1, lrem => 1,
- sadd => 1, srem => 1,
- sismember => 1,
- echo => 1,
- getset => 1,
- smove => 1,
- zadd => 1,
- zrem => 1,
- zscore => 1,
- zincrby => 1,
- append => 1,
- publish => 1,
-};
-
-# we don't want DESTROY to fallback into AUTOLOAD
sub DESTROY {}
our $AUTOLOAD;
-sub AUTOLOAD {
- my $self = shift;
-
- use bytes;
-
- my $sock = $self->{sock} || die "no server connected";
- my $command = $AUTOLOAD;
- $command =~ s/.*://;
+sub AUTOLOAD {
+ my ($self, @args) = @_;
- warn "## $command ",Dumper(@_) if $self->{debug};
-
- my ($callback, $after) = (sub {}, sub {});
+ my ($command) = $AUTOLOAD =~ m{ :: ( [^:]+ ) $ }xms;
- if ( $command eq 'subscribe' ) {
- ($callback, $after) = @_ == 3 ? splice @_, 1, 2
- : @_ == 2 ? (pop, sub {})
- : die 'Invalid arguments for consume command';
- }
-
- my $send;
-
- if ( defined $bulk_command->{$command} ) {
- my $value = pop;
- $value = '' if ! defined $value;
- $send
- = uc($command)
- . ' '
- . join(' ', @_)
- . ' '
- . length( $value )
- . "\r\n$value\r\n"
- ;
- } elsif ( $command ne 'consume' ) {
- $send
- = uc($command)
- . ' '
- . join(' ', @_)
- . "\r\n"
- ;
- }
+ no strict 'refs';
- if (defined $send) {
- warn ">> $send" if $self->{debug};
- print $sock $send;
- }
+ *$AUTOLOAD = sub {
+ my ($self, @args) = @_;
+
+ $self->_send_command($command, @args);
- if ( $command eq 'quit' ) {
- close( $sock ) || die "can't close socket: $!";
- return 1;
- }
+ if ($command eq 'quit') {
+ $self->_sock->close();
+ return 1;
+ }
- if ($command eq 'subscribe') {
- $self->{subscribe} = 1;
- }
- elsif ($command eq 'unsubscribe') {
- $self->{subscribe} = 0;
- }
+ if ($command eq 'unsubscribe') {
+ my @results = $self->_read_results();
+ my $channels_subscribed = $results[-1];
+
+ $self->{subscribed} = $channels_subscribed;
+
+ return @results;
+ }
+
+ return $self->_read_results();
+ };
- if ( $command eq 'subscribe' ) {
- $after->( $self, $self->__read_result($command) );
-
- my $messages = 0;
-
- while ($self->{subscribe}) {
- $callback->( $self, $self->__read_result($command) );
- $messages++;
- }
-
- return $messages;
+ goto &$AUTOLOAD;
+}
+
+sub _send_command {
+ my ($self, $command, @args) = @_;
+
+ my $value = $BulkCommands{$command} && defined $args[-1] ? pop @args : '';
+
+ if ($BulkCommands{$command}) {
+ $self->_send( uc($command), @args, length( $value ), "\r\n$value\r\n" );
}
else {
- return $self->__read_result($command);
+ $self->_send( uc($command), @args, "\r\n" );
}
}
-sub __read_result {
- my ($self, $command) = @_;
+sub _read_results {
+ my ($self, $line) = @_;
- use bytes;
-
- my $sock = $self->{sock} || die "no server connected";
-
- my $result = <$sock> || die "can't read socket: $!";
- Encode::_utf8_on($result);
- warn "<< $result" if $self->{debug};
- my $type = substr($result,0,1);
- $result = substr($result,1,-2);
-
- if ( $command eq 'info' ) {
- my $hash;
- foreach my $l ( split(/\r\n/, $self->__read_bulk($result) ) ) {
- my ($n,$v) = split(/:/, $l, 2);
- $hash->{$n} = $v;
- }
- return $hash;
+ if (!defined $line) {
+ $line = $self->_getline();
}
-
- if ( $type eq '-' ) {
- confess "[$command] $result";
- } elsif ( $type eq '+' ) {
- return $result;
- } elsif ( $type eq '$' ) {
- return $self->__read_bulk($result);
- } elsif ( $type eq '*' ) {
- return $self->__read_multi_bulk($result);
- } elsif ( $type eq ':' ) {
- return $result; # FIXME check if int?
- } else {
- return;
- }
+
+ my ($type, $result) = $line =~ m{ ^ (.) (.*?) \r\n $ }xms;
+
+ return $type eq '-' ? confess $result
+ : $type eq '+' ? $result
+ : $type eq '$' ? $self->_read_bulk($result)
+ : $type eq '*' ? $self->_read_multi_bulk($result)
+ : $type eq ':' ? $result
+ : wantarray ? ()
+ : undef
+ ;
}
-sub __read_bulk {
- my ($self,$len) = @_;
+sub _read_bulk {
+ my ($self, $len) = @_;
+
return if $len < 0;
- my $v;
+ my $value;
+
if ( $len > 0 ) {
- read($self->{sock}, $v, $len) || die $!;
- Encode::_utf8_on($v);
- warn "<< ",Dumper($v),$/ if $self->{debug};
+ $self->_sock->read($value, $len) or die $!;
+ Encode::_utf8_on($value);
}
+
my $crlf;
- read($self->{sock}, $crlf, 2); # skip cr/lf
- return $v;
+ $self->_sock->read($crlf, 2); # skip cr/lf
+
+ return $value;
}
-sub __read_multi_bulk {
+sub _read_multi_bulk {
my ($self,$size) = @_;
+
return if $size < 0;
- my $sock = $self->{sock};
- $size--;
+ my @results = map { $self->_read_results() } ( 1 .. $size );
+
+ return @results;
+}
- my @list = ( 0 .. $size );
- foreach ( 0 .. $size ) {
- my $result = <$sock>;
- $list[ $_ ] = substr($result, 0, 1) eq ':' ? substr($result, 1, -2) : $self->__read_bulk( substr($result,1,-2) );
- }
+sub subscribe {
+ my ($self, %args) = @_;
+
+ $args{consume} ||= sub {};
+ $args{subscribe} ||= sub {};
+ $args{unsubscribe} ||= sub {};
+
+ $self->_send_command('subscribe', @{ $args{channels} });
- warn "## list = ", Dumper( @list ) if $self->{debug};
- return @list;
+ $self->{subscribed} = 1;
+
+ # Setup handler called right after the client subscribes to channels
+ $args{subscribe}->( $self, $self->_read_results() );
+
+ while ($self->{subscribed}) {
+ # Handles each message consumed from the channel
+ $args{consume}->( $self, $self->_read_results() );
+ }
+
+ # Teardown handler called right after the client unsubscribes from the last channel
+ return $args{unsubscribe}->($self);
+}
+
+sub info {
+ my ($self) = @_;
+
+ $self->_send_command('info');
+
+ my $result = $self->_read_results();
+
+ my $unpair = sub {
+ my ($pair) = @_;
+ my ($name, $value) = split m{:}, $pair, 2;
+ return ($name => $value);
+ };
+
+ my %info = map { $unpair->($_) } split m{\r\n}, $result;
+
+ return \%info;
}
1;
__END__
+=head1 NAME
+
+Redis - perl binding for Redis database
+
+=head1 DESCRIPTION
+
+Pure perl bindings for L<http://code.google.com/p/redis/>
+
+This version supports the Redis protocol version 1.3 available at
+
+L<git://github.com/antirez/redis>
+
+This documentation lists commands which are exercised in the test suite, but
+additional commands should work correctly since the protocol
+specifies enough information to support almost all commands
+with same piece of code with a little help of C<AUTOLOAD>.
+
+=head1 FUNCTIONS
+
+=head2 new
+
+ my $r = Redis->new; # $ENV{REDIS_SERVER} or 127.0.0.1:6379
+
+ my $r = Redis->new( server => '192.168.0.1:6379', debug = 0 );
+
=head1 Connection Handling
=head2 quit
--- /dev/null
+package Redis::Debug;
+use strict;
+use warnings;
+use Redis;
+
+if ($ENV{REDIS_DEBUG}) {
+ __PACKAGE__->_setup_logging();
+}
+
+sub _setup_logging {
+ my ($class) = @_;
+
+ # Sometimes we want to log the arguments given to a method.
+ my %before_logger_for = (
+ _new_sock => sub { "Connecting to Redis server." },
+ __send => sub { "\n>> @_" },
+ );
+
+ # Sometimes we want to log the result of the method call.
+ my %after_logger_for = (
+ _getline => sub { "<< @_" },
+ _read_bulk => sub { "<< @_" },
+ _read_multi_bulk => sub { "## @_" },
+ );
+
+ no warnings 'redefine';
+ no strict 'refs';
+
+ # Insert logger before each method
+ for my $method (keys %before_logger_for) {
+ my $original_ref = \&{ "Redis::${method}" };
+
+ *{ "Redis::${method}" } = sub {
+ my ($self, @args) = @_;
+
+ my @log_items = $before_logger_for{$method}->(@args);
+ $class->_log(@log_items);
+
+ goto $original_ref;
+ };
+ }
+
+ # Insert logger after each method
+ for my $method (keys %after_logger_for) {
+ my $original_ref = \&{ "Redis::${method}" };
+
+ *{ "Redis::${method}" } = sub {
+ my ($self) = @_;
+
+ # Preserve context from caller
+ my @return_values = wantarray ? ($original_ref->(@_))
+ : (scalar($original_ref->(@_)))
+ ;
+
+ my @log_items = $after_logger_for{$method}->(@return_values);
+ $class->_log(@log_items);
+
+ return wantarray ? @return_values : $return_values[0];
+ };
+ }
+}
+
+sub _log {
+ my ($self, @items) = @_;
+
+ return if @items == 0;
+
+ my $message = ref $items[0] eq 'CODE' ? join ' ', $items[0]->() : "@items";
+ chomp $message;
+ warn "$message\n";
+}
+
+1;
+