Some refactoring to split out logging and enhance maintainability.
authorJames Barton <jbarton@jbarton-laptop.ideaworks.com>
Fri, 4 Jun 2010 17:55:46 +0000 (12:55 -0500)
committerJames Barton <jbarton@jbarton-laptop.ideaworks.com>
Fri, 4 Jun 2010 17:55:46 +0000 (12:55 -0500)
lib/Redis.pm
lib/Redis/Debug.pm [new file with mode: 0644]
t/30-Redis-PubSub.t

index 8c4aad6..c61ef2c 100644 (file)
 package Redis;
-
 use warnings;
 use strict;
-
 use IO::Socket::INET;
 use Data::Dumper;
-use Carp qw/confess/;
+use Carp qw( confess );
 use Encode;
+use bytes;
 
-=head1 NAME
-
-Redis - perl binding for Redis database
-
-=cut
+if ($ENV{REDIS_DEBUG}) {
+    require Redis::Debug;
+}
 
 our $VERSION = '1.2001';
 
+our %BulkCommands = map { $_ => 1 } qw(
+       set            setnx    rpush   lpush
+       lset       lrem     sadd    srem
+       sismember  echo     getset  smove  
+       zadd       zrem     zscore  zincrby
+       append     publish
+);
 
-=head1 DESCRIPTION
-
-Pure perl bindings for L<http://code.google.com/p/redis/>
-
-This version supports protocol 1.2 or later of Redis available at
-
-L<git://github.com/antirez/redis>
-
-This documentation
-lists commands which are exercised in test suite, but
-additinal commands will work correctly since protocol
-specifies enough information to support almost all commands
-with same peace of code with a little help of C<AUTOLOAD>.
-
-=head1 FUNCTIONS
+sub new {
+    my ($class, %args) = @_;
 
-=head2 new
+    return bless \%args => $class;
+}
 
-  my $r = Redis->new; # $ENV{REDIS_SERVER} or 127.0.0.1:6379
+sub _sock {
+    my ($self) = @_;
 
-  my $r = Redis->new( server => '192.168.0.1:6379', debug = 0 );
+    return $self->{_sock} ||= $self->_new_sock();
+}
 
-=cut
+sub _new_sock {
+    my ($self) = @_;
+    
+    return IO::Socket::INET->new(
+           PeerAddr => $self->{server} || $ENV{REDIS_SERVER} || '127.0.0.1:6379',
+           Proto    => 'tcp',
+    ) or die $!;
+}
 
-sub new {
-       my $class = shift;
-       my $self = {@_};
-       $self->{debug} ||= $ENV{REDIS_DEBUG};
+sub _send {
+    my ($self, @args) = @_;
+    
+    my $message = join ' ', map { defined $_ ? $_ : '' } @args;
+    $self->__send($message);
+}
 
-       $self->{sock} = IO::Socket::INET->new(
-               PeerAddr => $self->{server} || $ENV{REDIS_SERVER} || '127.0.0.1:6379',
-               Proto => 'tcp',
-       ) || die $!;
+sub __send {
+    my ($self, $message) = @_;
+    $self->_sock->print($message);
+}
 
-       bless($self, $class);
-       $self;
+sub _getline {
+    my ($self) = @_;
+    
+    my $line = $self->_sock->getline();
+    Encode::_utf8_on($line);
+    
+    return $line;
 }
 
-my $bulk_command = {
-       set => 1,       setnx => 1,
-       rpush => 1,     lpush => 1,
-       lset => 1,      lrem => 1,
-       sadd => 1,      srem => 1,
-       sismember => 1,
-       echo => 1,
-       getset => 1,
-       smove => 1,
-       zadd => 1,
-       zrem => 1,
-       zscore => 1,
-       zincrby => 1,
-       append => 1,
-       publish => 1,
-};
-
-# we don't want DESTROY to fallback into AUTOLOAD
 sub DESTROY {}
 
 our $AUTOLOAD;
-sub AUTOLOAD {
-       my $self = shift;
-
-       use bytes;
-
-       my $sock = $self->{sock} || die "no server connected";
 
-       my $command = $AUTOLOAD;
-       $command =~ s/.*://;
+sub AUTOLOAD {
+       my ($self, @args) = @_;
 
-       warn "## $command ",Dumper(@_) if $self->{debug};
-       
-       my ($callback, $after) = (sub {}, sub {});
+       my ($command) = $AUTOLOAD =~ m{ :: ( [^:]+ ) $ }xms;
        
-    if ( $command eq 'subscribe' ) {
-        ($callback, $after) = @_ == 3 ? splice @_, 1, 2
-                            : @_ == 2 ? (pop, sub {})
-                            :           die 'Invalid arguments for consume command';
-    }
-    
-       my $send;
-
-       if ( defined $bulk_command->{$command} ) {
-               my $value = pop;
-               $value = '' if ! defined $value;
-               $send
-                       = uc($command)
-                       . ' '
-                       . join(' ', @_)
-                       . ' ' 
-                       . length( $value )
-                       . "\r\n$value\r\n"
-                       ;
-       } elsif ( $command ne 'consume' ) {
-               $send
-                       = uc($command)
-                       . ' '
-                       . join(' ', @_)
-                       . "\r\n"
-                       ;
-       }
+       no strict 'refs';
        
-    if (defined $send) {
-           warn ">> $send" if $self->{debug};
-           print $sock $send;
-    }
+       *$AUTOLOAD = sub {
+        my ($self, @args) = @_;
+        
+        $self->_send_command($command, @args);
 
-       if ( $command eq 'quit' ) {
-               close( $sock ) || die "can't close socket: $!";
-               return 1;
-       }
+        if ($command eq 'quit') {
+               $self->_sock->close();
+               return 1;
+        }
        
-       if ($command eq 'subscribe') {
-           $self->{subscribe} = 1;
-       }
-       elsif ($command eq 'unsubscribe') {
-           $self->{subscribe} = 0;
-       }
+           if ($command eq 'unsubscribe') {
+               my @results             = $self->_read_results();
+               my $channels_subscribed = $results[-1];
+               
+               $self->{subscribed} = $channels_subscribed;
+               
+               return @results;
+           }
+
+        return $self->_read_results();
+    };
     
-    if ( $command eq 'subscribe' ) {
-        $after->( $self, $self->__read_result($command) );
-        
-        my $messages = 0;
-        
-        while ($self->{subscribe}) {
-            $callback->( $self, $self->__read_result($command) );
-            $messages++;
-        }
-        
-        return $messages;
+    goto &$AUTOLOAD;
+}
+
+sub _send_command {
+    my ($self, $command, @args) = @_;
+
+    my $value = $BulkCommands{$command} && defined $args[-1] ? pop @args : '';
+
+    if ($BulkCommands{$command}) {
+        $self->_send( uc($command), @args, length( $value ), "\r\n$value\r\n" );
     }
     else {
-        return $self->__read_result($command);
+        $self->_send( uc($command), @args, "\r\n" );
     }
 }
 
-sub __read_result {
-    my ($self, $command) = @_;
+sub _read_results {
+    my ($self, $line) = @_;
     
-       use bytes;
-
-       my $sock = $self->{sock} || die "no server connected";
-    
-       my $result = <$sock> || die "can't read socket: $!";
-       Encode::_utf8_on($result);
-       warn "<< $result" if $self->{debug};
-       my $type = substr($result,0,1);
-       $result = substr($result,1,-2);
-
-       if ( $command eq 'info' ) {
-               my $hash;
-               foreach my $l ( split(/\r\n/, $self->__read_bulk($result) ) ) {
-                       my ($n,$v) = split(/:/, $l, 2);
-                       $hash->{$n} = $v;
-               }
-               return $hash;
+    if (!defined $line) {
+        $line = $self->_getline();
     }
-
-       if ( $type eq '-' ) {
-               confess "[$command] $result";
-       } elsif ( $type eq '+' ) {
-               return $result;
-       } elsif ( $type eq '$' ) {
-               return $self->__read_bulk($result);
-       } elsif ( $type eq '*' ) {
-               return $self->__read_multi_bulk($result);
-       } elsif ( $type eq ':' ) {
-               return $result; # FIXME check if int?
-       } else {
-               return;
-       }
+    
+    my ($type, $result) = $line =~ m{ ^ (.) (.*?) \r\n $ }xms;
+    
+    return $type eq '-' ? confess $result
+         : $type eq '+' ? $result
+         : $type eq '$' ? $self->_read_bulk($result)
+         : $type eq '*' ? $self->_read_multi_bulk($result)
+         : $type eq ':' ? $result
+         : wantarray    ? ()
+         :                undef
+         ;
 }
 
-sub __read_bulk {
-       my ($self,$len) = @_;
+sub _read_bulk {
+       my ($self, $len) = @_;
+       
        return if $len < 0;
 
-       my $v;
+       my $value;
+       
        if ( $len > 0 ) {
-               read($self->{sock}, $v, $len) || die $!;
-               Encode::_utf8_on($v);
-               warn "<< ",Dumper($v),$/ if $self->{debug};
+               $self->_sock->read($value, $len) or die $!;
+               Encode::_utf8_on($value);
        }
+       
        my $crlf;
-       read($self->{sock}, $crlf, 2); # skip cr/lf
-       return $v;
+       $self->_sock->read($crlf, 2); # skip cr/lf
+       
+       return $value;
 }
 
-sub __read_multi_bulk {
+sub _read_multi_bulk {
        my ($self,$size) = @_;
+       
        return if $size < 0;
-       my $sock = $self->{sock};
 
-       $size--;
+       my @results = map { $self->_read_results() } ( 1 .. $size );
+    
+       return @results;
+}
 
-       my @list = ( 0 .. $size );
-       foreach ( 0 .. $size ) {
-           my $result = <$sock>;
-               $list[ $_ ] = substr($result, 0, 1) eq ':' ? substr($result, 1, -2) : $self->__read_bulk( substr($result,1,-2) );
-       }
+sub subscribe {
+    my ($self, %args) = @_;
+    
+    $args{consume}     ||= sub {};
+    $args{subscribe}   ||= sub {};
+    $args{unsubscribe} ||= sub {};
+
+       $self->_send_command('subscribe', @{ $args{channels} });
 
-       warn "## list = ", Dumper( @list ) if $self->{debug};
-       return @list;
+    $self->{subscribed} = 1;
+    
+    # Setup handler called right after the client subscribes to channels
+    $args{subscribe}->( $self, $self->_read_results() );
+    
+    while ($self->{subscribed}) {
+        # Handles each message consumed from the channel
+        $args{consume}->( $self, $self->_read_results() );
+    }
+
+    # Teardown handler called right after the client unsubscribes from the last channel
+    return $args{unsubscribe}->($self);
+}
+
+sub info {
+    my ($self) = @_;
+    
+    $self->_send_command('info');
+    
+    my $result = $self->_read_results();
+
+    my $unpair = sub {
+        my ($pair) = @_;
+        my ($name, $value) = split m{:}, $pair, 2;
+        return ($name => $value);
+    };
+    
+    my %info = map { $unpair->($_) } split m{\r\n}, $result;
+
+    return \%info;
 }
 
 1;
 
 __END__
 
+=head1 NAME
+
+Redis - perl binding for Redis database
+
+=head1 DESCRIPTION
+
+Pure perl bindings for L<http://code.google.com/p/redis/>
+
+This version supports the Redis protocol version 1.3 available at
+
+L<git://github.com/antirez/redis>
+
+This documentation lists commands which are exercised in the test suite, but
+additional commands should work correctly since the protocol
+specifies enough information to support almost all commands
+with same piece of code with a little help of C<AUTOLOAD>.
+
+=head1 FUNCTIONS
+
+=head2 new
+
+  my $r = Redis->new; # $ENV{REDIS_SERVER} or 127.0.0.1:6379
+
+  my $r = Redis->new( server => '192.168.0.1:6379', debug = 0 );
+
 =head1 Connection Handling
 
 =head2 quit
diff --git a/lib/Redis/Debug.pm b/lib/Redis/Debug.pm
new file mode 100644 (file)
index 0000000..3774fb9
--- /dev/null
@@ -0,0 +1,74 @@
+package Redis::Debug;
+use strict;
+use warnings;
+use Redis;
+
+if ($ENV{REDIS_DEBUG}) {
+    __PACKAGE__->_setup_logging();
+}
+
+sub _setup_logging {
+    my ($class) = @_;
+    
+    # Sometimes we want to log the arguments given to a method.
+    my %before_logger_for = (
+        _new_sock => sub { "Connecting to Redis server." },
+        __send    => sub { "\n>> @_" },
+    );
+    
+    # Sometimes we want to log the result of the method call.
+    my %after_logger_for = (
+        _getline         => sub { "<< @_" },
+        _read_bulk       => sub { "<< @_" },
+        _read_multi_bulk => sub { "## @_" },
+    );
+    
+    no warnings 'redefine';
+    no strict 'refs';
+    
+    # Insert logger before each method
+    for my $method (keys %before_logger_for) {
+        my $original_ref = \&{ "Redis::${method}" };
+        
+        *{ "Redis::${method}" } = sub {
+            my ($self, @args) = @_;
+            
+            my @log_items = $before_logger_for{$method}->(@args);
+            $class->_log(@log_items);
+            
+            goto $original_ref;
+        };
+    }
+    
+    # Insert logger after each method
+    for my $method (keys %after_logger_for) {
+        my $original_ref = \&{ "Redis::${method}" };
+        
+        *{ "Redis::${method}" } = sub {
+            my ($self) = @_;
+            
+            # Preserve context from caller
+            my @return_values = wantarray ? ($original_ref->(@_))
+                              :             (scalar($original_ref->(@_)))
+                              ;
+
+            my @log_items = $after_logger_for{$method}->(@return_values);
+            $class->_log(@log_items);
+            
+            return wantarray ? @return_values : $return_values[0];
+        };
+    }
+}
+
+sub _log {
+    my ($self, @items) = @_;
+
+    return if @items == 0;
+
+    my $message = ref $items[0] eq 'CODE' ? join ' ', $items[0]->() : "@items";
+    chomp $message;
+    warn "$message\n";
+}
+
+1;
+
index 5d36856..32eaf42 100644 (file)
@@ -15,16 +15,20 @@ diag "Publish / subscribe commands";
 ok( my $p = Redis->new(), 'publisher' );
 ok( my $s = Redis->new(), 'subscriber' );
 
-my $callback = sub {
+my $consume = sub {
     my ($self, @result) = @_;
 
     is_deeply( [ @result ], ['message', 'channel-1', 'test message'] );
     is_deeply( [ $self->unsubscribe( 'channel-1' ) ], [ 'unsubscribe', 'channel-1', 0 ] );
 };
 
-my $after = sub {
+my $subscribe = sub {
     cmp_ok( $p->publish( 'channel-1', 'test message' ), '==', 1, '1 client received published message' );
 };
 
-ok $s->subscribe( 'channel-1', $callback, $after );
+my $unsubscribe = sub {
+    return 234;
+};
+
+cmp_ok $s->subscribe( channels => ['channel-1'], subscribe => $subscribe, consume => $consume, unsubscribe => $unsubscribe ), '==', 234;