8 use Fcntl qw( O_NONBLOCK F_SETFL );
15 Redis - perl binding for Redis database
19 our $VERSION = '1.2001';
24 Pure perl bindings for L<http://code.google.com/p/redis/>
26 This version supports protocol 1.2 or later of Redis available at
28 L<git://github.com/antirez/redis>
31 lists commands which are exercised in test suite, but
32 additinal commands will work correctly since protocol
33 specifies enough information to support almost all commands
34 with same peace of code with a little help of C<AUTOLOAD>.
40 my $r = Redis->new; # $ENV{REDIS_SERVER} or 127.0.0.1:6379
42 my $r = Redis->new( server => '192.168.0.1:6379', debug = 0 );
50 $self->{debug} ||= $ENV{REDIS_DEBUG};
51 $self->{encoding} ||= 'utf8'; ## default to lax utf8
53 $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
54 $self->{sock} = IO::Socket::INET->new(
55 PeerAddr => $self->{server},
57 ) || confess("Could not connect to Redis server at $self->{server}: $!");
59 $self->{read_size} = 8192;
62 $self->{is_subscriber} = 0;
63 $self->{subscribers} = {};
65 return bless($self, $class);
68 sub is_subscriber { $_[0]{is_subscriber} }
71 ### we don't want DESTROY to fallback into AUTOLOAD
75 ### Deal with common, general case, Redis commands
80 my $sock = $self->{sock} || confess("Not connected to any server");
81 my $enc = $self->{encoding};
82 my $deb = $self->{debug};
84 my $command = $AUTOLOAD;
86 $self->__is_valid_command($command);
88 ## PubSub commands use a different answer handling
89 if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
93 confess("Missing required callback in call to $command(), ")
94 unless ref($cb) eq 'CODE';
97 @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
101 $self->__send_command($command, @subs);
103 my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
104 return $self->__process_subscription_changes($command, \%cbs);
107 $self->__send_command($command, @_);
108 return $self->__read_response($command);
112 ### Commands with extra logic
116 $self->__send_command('QUIT');
118 delete $self->{rbuf};
119 close(delete $self->{sock}) || confess("Can't close socket: $!");
126 $self->__is_valid_command('INFO');
128 $self->__send_command('INFO');
130 my $info = $self->__read_response('INFO');
132 return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
137 $self->__is_valid_command('KEYS');
139 $self->__send_command('KEYS', @_);
141 my @keys = $self->__read_response('KEYS', \my $type);
142 ## Support redis > 1.26
143 return @keys if $type eq '*';
145 ## Support redis <= 1.2.6
146 return split(/\s/, $keys[0]) if $keys[0];
152 sub wait_for_messages {
153 my ($self, $timeout) = @_;
155 my $s = IO::Select->new;
156 $s->add($self->{sock});
159 while ($s->can_read($timeout)) {
160 while ($self->__can_read_sock) {
161 my @m = $self->__read_response('WAIT_FOR_MESSAGES');
162 $self->__process_pubsub_msg(\@m);
170 sub __process_unsubscribe_requests {
171 my ($self, $cb, $pr, @unsubs) = @_;
172 my $subs = $self->{subscribers};
174 my @subs_to_unsubscribe;
175 for my $sub (@unsubs) {
176 my $key = "${pr}message:$sub";
177 my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}];
180 delete $subs->{$key};
181 push @subs_to_unsubscribe, $sub;
184 return @subs_to_unsubscribe;
187 sub __process_subscription_changes {
188 my ($self, $cmd, $expected) = @_;
189 my $subs = $self->{subscribers};
192 my @m = $self->__read_response($cmd);
194 ## Deal with pending PUBLISH'ed messages
195 if ($m[0] =~ /^p?message$/) {
196 $self->__process_pubsub_msg(\@m);
200 my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
201 $key .= "message:$m[1]";
202 my $cb = delete $expected->{$key};
204 push @{$subs->{$key}}, $cb unless $unsub;
206 $self->{is_subscriber} = $m[2];
210 sub __process_pubsub_msg {
212 my $subs = $self->{subscribers};
215 my $cbid = "$m->[0]:$sub";
217 my $topic = $m->[2] || $sub;
219 if (!exists $subs->{$cbid}) {
220 warn "Message for topic '$topic' ($cbid) without expected callback, ";
224 $_->($data, $topic, $sub) for @{$subs->{$cbid}};
232 sub __is_valid_command {
233 my ($self, $cmd) = @_;
235 return unless $self->{is_subscriber};
236 return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i;
237 confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ");
241 ### Socket operations
245 my $enc = $self->{encoding};
246 my $deb = $self->{debug};
248 warn "[SEND] $cmd ", Dumper([@_]) if $deb;
250 ## Encode command using multi-bulk format
251 my $n_elems = scalar(@_) + 1;
252 my $buf = "\*$n_elems\r\n";
253 for my $elem ($cmd, @_) {
254 my $bin = $enc ? encode($enc, $elem) : $elem;
255 $buf .= defined($bin) ? '$' . length($bin) . "\r\n$bin\r\n" : "\$-1\r\n";
258 ## Send command, take care for partial writes
259 warn "[SEND RAW] $buf" if $deb;
260 my $sock = $self->{sock} || confess("Not connected to any server");
262 my $len = syswrite $sock, $buf, length $buf;
263 confess("Could not write to Redis server: $!")
265 substr $buf, 0, $len, "";
271 sub __read_response {
272 my ($self, $command, $type_r) = @_;
274 my ($type, $result) = $self->__read_sock;
275 $$type_r = $type if $type_r;
278 confess "[$command] $result, ";
280 elsif ($type eq '+') {
283 elsif ($type eq '$') {
284 return if $result < 0;
285 return $self->__read_sock($result);
287 elsif ($type eq '*') {
290 push @list, $self->__read_response($command);
294 elsif ($type eq ':') {
298 confess "unknown answer type: $type ($result), ";
303 my ($self, $len) = @_;
304 my $sock = $self->{sock} || confess("Not connected to any server");
305 my $enc = $self->{encoding};
306 my $deb = $self->{debug};
307 my $rbuf = \($self->{rbuf});
309 my ($data, $type) = ('', '');
310 my $read_size = $self->{read_size};
311 $read_size = $len + 2 if defined $len && $len + 2 > $read_size;
314 ## Read NN bytes, strip \r\n at the end
316 if (length($$rbuf) >= $len + 2) {
317 $data = substr(substr($$rbuf, 0, $len + 2, ''), 0, -2);
321 ## No len, means line more, read until \r\n
322 elsif ($$rbuf =~ s/^(.)([^\015\012]*)\015\012//) {
323 ($type, $data) = ($1, $2);
327 my $bytes = sysread $sock, $$rbuf, $read_size, length $$rbuf;
328 confess("Error while reading from Redis server: $!")
329 unless defined $bytes;
330 confess("Redis server closed connection") unless $bytes;
333 $data = decode($enc, $data) if $enc;
334 warn "[RECV] '$type$data'" if $self->{debug};
336 return ($type, $data) if $type;
340 sub __can_read_sock {
342 my $sock = $self->{sock};
343 my $rbuf = \($self->{rbuf});
346 __fh_nonblocking($sock, 1);
347 my $bytes = sysread $sock, $$rbuf, $self->{read_size}, length $$rbuf;
348 __fh_nonblocking($sock, 0);
354 ### Copied from AnyEvent::Util
356 *__fh_nonblocking = ($^O eq 'MSWin32')
357 ? sub($$) { ioctl $_[0], 0x8004667e, pack "L", $_[1]; } # FIONBIO
358 : sub($$) { fcntl $_[0], F_SETFL, $_[1] ? O_NONBLOCK : 0; };
366 =head1 Connection Handling
374 $r->ping || die "no server?";
376 =head1 Commands operating on string values
380 $r->set( foo => 'bar' );
382 $r->setnx( foo => 42 );
386 my $value = $r->get( 'foo' );
390 my @values = $r->mget( 'foo', 'bar', 'baz' );
396 $r->incrby('tripplets', 3);
402 $r->decrby('tripplets', 3);
406 $r->exists( 'key' ) && print "got key!";
410 $r->del( 'key' ) || warn "key doesn't exist";
414 $r->type( 'key' ); # = string
416 =head1 Commands operating on the key space
420 my @keys = $r->keys( '*glob_pattern*' );
424 my $key = $r->randomkey;
428 my $ok = $r->rename( 'old-key', 'new-key', $new );
432 my $nr_keys = $r->dbsize;
434 =head1 Commands operating on lists
436 See also L<Redis::List> for tie interface.
440 $r->rpush( $key, $value );
444 $r->lpush( $key, $value );
452 my @list = $r->lrange( $key, $start, $end );
456 my $ok = $r->ltrim( $key, $start, $end );
460 $r->lindex( $key, $index );
464 $r->lset( $key, $index, $value );
468 my $modified_count = $r->lrem( $key, $count, $value );
472 my $value = $r->lpop( $key );
476 my $value = $r->rpop( $key );
478 =head1 Commands operating on sets
482 $r->sadd( $key, $member );
486 $r->srem( $key, $member );
490 my $elements = $r->scard( $key );
494 $r->sismember( $key, $member );
498 $r->sinter( $key1, $key2, ... );
502 my $ok = $r->sinterstore( $dstkey, $key1, $key2, ... );
504 =head1 Multiple databases handling commands
508 $r->select( $dbindex ); # 0 for new clients
512 $r->move( $key, $dbindex );
526 $r->sort("key BY pattern LIMIT start end GET pattern ASC|DESC ALPHA');
528 =head1 Persistence control commands
546 =head1 Remote server control commands
550 my $info_hash = $r->info;
554 Since Redis knows nothing about encoding, we are forcing utf-8 flag on all data received from Redis.
555 This change is introduced in 1.2001 version.
557 This allows us to round-trip utf-8 encoded characters correctly, but might be problem if you push
558 binary junk into Redis and expect to get it back without utf-8 flag turned on.
562 Dobrica Pavlinusic, C<< <dpavlin at rot13.org> >>
566 Please report any bugs or feature requests to C<bug-redis at rt.cpan.org>, or through
567 the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Redis>. I will be notified, and then you'll
568 automatically be notified of progress on your bug as I make changes.
575 You can find documentation for this module with the perldoc command.
582 You can also look for information at:
586 =item * RT: CPAN's request tracker
588 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=Redis>
590 =item * AnnoCPAN: Annotated CPAN documentation
592 L<http://annocpan.org/dist/Redis>
596 L<http://cpanratings.perl.org/d/Redis>
600 L<http://search.cpan.org/dist/Redis>
605 =head1 ACKNOWLEDGEMENTS
608 =head1 COPYRIGHT & LICENSE
610 Copyright 2009-2010 Dobrica Pavlinusic, all rights reserved.
612 This program is free software; you can redistribute it and/or modify it
613 under the same terms as Perl itself.