sub new {
my $class = shift;
- my $self = {@_};
+ my $self = {@_};
+
$self->{debug} ||= $ENV{REDIS_DEBUG};
+ $self->{encoding} ||= 'utf8'; ## default to lax utf8
+ $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
$self->{sock} = IO::Socket::INET->new(
- PeerAddr => $self->{server} || $ENV{REDIS_SERVER} || '127.0.0.1:6379',
+ PeerAddr => $self->{server},
Proto => 'tcp',
- ) || die $!;
+ ) || confess("Could not connect to Redis server at $self->{server}: $!");
- bless($self, $class);
- $self;
+ return bless($self, $class);
}
-my $bulk_command = {
- set => 1, setnx => 1,
- rpush => 1, lpush => 1,
- lset => 1, lrem => 1,
- sadd => 1, srem => 1,
- sismember => 1,
- echo => 1,
- getset => 1,
- smove => 1,
- zadd => 1,
- zrem => 1,
- zscore => 1,
- zincrby => 1,
- append => 1,
-};
-
# we don't want DESTROY to fallback into AUTOLOAD
sub DESTROY {}
+
+### Deal with common, general case, Redis commands
our $AUTOLOAD;
sub AUTOLOAD {
my $self = shift;
-
- use bytes;
-
- my $sock = $self->{sock} || die "no server connected";
+ my $sock = $self->{sock} || confess("Not connected to any server");
+ my $enc = $self->{encoding};
+ my $deb = $self->{debug};
my $command = $AUTOLOAD;
$command =~ s/.*://;
- warn "## $command ",Dumper(@_) if $self->{debug};
-
- my $send;
-
- if ( defined $bulk_command->{$command} ) {
- my $value = pop;
- $value = '' if ! defined $value;
- $send
- = uc($command)
- . ' '
- . join(' ', @_)
- . ' '
- . length( $value )
- . "\r\n$value\r\n"
- ;
- } else {
- $send
- = uc($command)
- . ' '
- . join(' ', @_)
- . "\r\n"
- ;
- }
-
- warn ">> $send" if $self->{debug};
- print $sock $send;
+ $self->__send_command($command, @_);
- if ( $command eq 'quit' ) {
- close( $sock ) || die "can't close socket: $!";
- return 1;
- }
-
- my $result = <$sock> || die "can't read socket: $!";
- Encode::_utf8_on($result);
- warn "<< $result" if $self->{debug};
+ my $result = <$sock> || confess("Can't read socket: $!");
my $type = substr($result,0,1);
$result = substr($result,1,-2);
+ $result = decode($enc, $result) if $enc;
+ warn "[RECV] '$type$result'" if $deb;
+
if ( $command eq 'info' ) {
my $hash;
foreach my $l ( split(/\r\n/, $self->__read_bulk($result) ) ) {
}
return $hash;
} elsif ( $command eq 'keys' ) {
+ return $self->__read_multi_bulk($result)
+ if $type eq '*';
my $keys = $self->__read_bulk($result);
return split(/\s/, $keys) if $keys;
return;
}
}
+
+### Commands with extra logic
+
+sub quit {
+ my ($self) = @_;
+
+ $self->__send_command('QUIT');
+
+ close(delete $self->{sock}) || confess("Can't close socket: $!");
+ return 1;
+}
+
+
+### Socket operations
+
+sub __send_command {
+ my $self = shift;
+ my $cmd = uc(shift);
+ my $enc = $self->{encoding};
+ my $deb = $self->{debug};
+
+ warn "[SEND] $cmd ", Dumper([@_]) if $deb;
+
+ ## Encode command using multi-bulk format
+ my $n_elems = scalar(@_) + 1;
+ my $buf = "\*$n_elems\r\n";
+ for my $elem ($cmd, @_) {
+ my $bin = $enc ? encode($enc, $elem) : $elem;
+ $buf .= defined($bin) ? '$' . length($bin) . "\r\n$bin\r\n" : "\$-1\r\n";
+ }
+
+ ## Send command, take care for partial writes
+ warn "[SEND RAW] $buf" if $deb;
+ my $sock = $self->{sock} || confess("Not connected to any server");
+ while ($buf) {
+ my $len = syswrite $sock, $buf, length $buf;
+ confess("Could not write to Redis server: $!")
+ unless $len;
+ substr $buf, 0, $len, "";
+ }
+
+ return;
+}
+
sub __read_bulk {
my ($self,$len) = @_;
- return undef if $len < 0;
+ return if $len < 0;
- my $v;
+ my $enc = $self->{encoding};
+ my $v = '';
if ( $len > 0 ) {
- read($self->{sock}, $v, $len) || die $!;
- Encode::_utf8_on($v);
- warn "<< ",Dumper($v),$/ if $self->{debug};
+ read($self->{sock}, $v, $len) || confess("Could not read from sock: $!");
+ $v = decode($enc, $v) if $enc;
}
my $crlf;
read($self->{sock}, $crlf, 2); # skip cr/lf
+
+ warn "[PARSE] read_bulk ".Dumper($v) if $self->{debug};
return $v;
}
sub __read_multi_bulk {
my ($self,$size) = @_;
- return undef if $size < 0;
- my $sock = $self->{sock};
-
- $size--;
+ return if $size <= 0;
- my @list = ( 0 .. $size );
- foreach ( 0 .. $size ) {
- $list[ $_ ] = $self->__read_bulk( substr(<$sock>,1,-2) );
+ my $sock = $self->{sock};
+ my $deb = $self->{debug};
+ my $enc = $self->{encoding};
+ my @list;
+ while ($size--) {
+ my $v = $self->__read_bulk( substr(<$sock>,1,-2) );
+ $v = decode($enc, $v) if $enc;
+ warn " [PARSE] read_multi_bulk ($size) ".Dumper($v) if $deb;
+ push @list, $v;
}
- warn "## list = ", Dumper( @list ) if $self->{debug};
+ warn "[PARSE] multi_bulk ".Dumper( \@list ) if $deb;
return @list;
}