3829007d9d4619f4d0a0a2b8b28e72b014f6551a
[perl-Redis.git] / lib / Redis.pm
1 package Redis;
2
3 use warnings;
4 use strict;
5
6 use IO::Socket::INET;
7 use IO::Select;
8 use Fcntl qw( O_NONBLOCK F_SETFL );
9 use Data::Dumper;
10 use Carp qw/confess/;
11 use Encode;
12
13 =head1 NAME
14
15 Redis - perl binding for Redis database
16
17 =cut
18
19 our $VERSION = '1.2001';
20
21
22 =head1 DESCRIPTION
23
24 Pure perl bindings for L<http://code.google.com/p/redis/>
25
26 This version supports protocol 1.2 or later of Redis available at
27
28 L<git://github.com/antirez/redis>
29
30 This documentation
31 lists commands which are exercised in test suite, but
32 additinal commands will work correctly since protocol
33 specifies enough information to support almost all commands
34 with same peace of code with a little help of C<AUTOLOAD>.
35
36 =head1 FUNCTIONS
37
38 =head2 new
39
40   my $r = Redis->new; # $ENV{REDIS_SERVER} or 127.0.0.1:6379
41
42   my $r = Redis->new( server => '192.168.0.1:6379', debug = 0 );
43
44 =cut
45
46 sub new {
47   my $class = shift;
48   my $self  = {@_};
49
50   $self->{debug} ||= $ENV{REDIS_DEBUG};
51   $self->{encoding} ||= 'utf8';    ## default to lax utf8
52
53   $self->{server} ||= $ENV{REDIS_SERVER} || '127.0.0.1:6379';
54   $self->{sock} = IO::Socket::INET->new(
55     PeerAddr => $self->{server},
56     Proto    => 'tcp',
57   ) || confess("Could not connect to Redis server at $self->{server}: $!");
58
59   $self->{read_size} = 8192;
60   $self->{rbuf}      = '';
61
62   $self->{is_subscriber} = 0;
63   $self->{subscribers}   = {};
64
65   return bless($self, $class);
66 }
67
68 sub is_subscriber { $_[0]{is_subscriber} }
69
70
71 ### we don't want DESTROY to fallback into AUTOLOAD
72 sub DESTROY { }
73
74
75 ### Deal with common, general case, Redis commands
76 our $AUTOLOAD;
77
78 sub AUTOLOAD {
79   my $self = shift;
80   my $sock = $self->{sock} || confess("Not connected to any server");
81   my $enc  = $self->{encoding};
82   my $deb  = $self->{debug};
83
84   my $command = $AUTOLOAD;
85   $command =~ s/.*://;
86   $self->__is_valid_command($command);
87
88   ## PubSub commands use a different answer handling
89   if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
90     $pr = '' unless $pr;
91
92     my $cb = pop;
93     confess("Missing required callback in call to $command(), ")
94       unless ref($cb) eq 'CODE';
95
96     my @subs = @_;
97     @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
98       if $unsub;
99     return unless @subs;
100
101     $self->__send_command($command, @subs);
102
103     my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
104     return $self->__process_subscription_changes($command, \%cbs);
105   }
106
107   $self->__send_command($command, @_);
108   return $self->__read_response($command);
109 }
110
111
112 ### Commands with extra logic
113 sub quit {
114   my ($self) = @_;
115
116   $self->__send_command('QUIT');
117
118   delete $self->{rbuf};
119   close(delete $self->{sock}) || confess("Can't close socket: $!");
120
121   return 1;
122 }
123
124 sub info {
125   my ($self) = @_;
126   $self->__is_valid_command('INFO');
127
128   $self->__send_command('INFO');
129
130   my $info = $self->__read_response('INFO');
131
132   return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
133 }
134
135 sub keys {
136   my $self = shift;
137   $self->__is_valid_command('KEYS');
138
139   $self->__send_command('KEYS', @_);
140
141   my @keys = $self->__read_response('KEYS', \my $type);
142   ## Support redis > 1.26
143   return @keys if $type eq '*';
144
145   ## Support redis <= 1.2.6
146   return split(/\s/, $keys[0]) if $keys[0];
147   return;
148 }
149
150
151 ### PubSub
152 sub wait_for_messages {
153   my ($self, $timeout) = @_;
154
155   my $s = IO::Select->new;
156   $s->add($self->{sock});
157
158   my $count = 0;
159   while ($s->can_read($timeout)) {
160     while ($self->__can_read_sock) {
161       my @m = $self->__read_response('WAIT_FOR_MESSAGES');
162       $self->__process_pubsub_msg(\@m);
163       $count++;
164     }
165   }
166
167   return $count;
168 }
169
170 sub __process_unsubscribe_requests {
171   my ($self, $cb, $pr, @unsubs) = @_;
172   my $subs = $self->{subscribers};
173
174   my @subs_to_unsubscribe;
175   for my $sub (@unsubs) {
176     my $key = "${pr}message:$sub";
177     my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}];
178     next if @$cbs;
179
180     delete $subs->{$key};
181     push @subs_to_unsubscribe, $sub;
182   }
183
184   return @subs_to_unsubscribe;
185 }
186
187 sub __process_subscription_changes {
188   my ($self, $cmd, $expected) = @_;
189   my $subs = $self->{subscribers};
190
191   while (%$expected) {
192     my @m = $self->__read_response($cmd);
193
194     ## Deal with pending PUBLISH'ed messages
195     if ($m[0] =~ /^p?message$/) {
196       $self->__process_pubsub_msg(\@m);
197       next;
198     }
199
200     my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
201     $key .= "message:$m[1]";
202     my $cb = delete $expected->{$key};
203
204     push @{$subs->{$key}}, $cb unless $unsub;
205
206     $self->{is_subscriber} = $m[2];
207   }
208 }
209
210 sub __process_pubsub_msg {
211   my ($self, $m) = @_;
212   my $subs = $self->{subscribers};
213
214   my $sub   = $m->[1];
215   my $cbid  = "$m->[0]:$sub";
216   my $data  = pop @$m;
217   my $topic = $m->[2] || $sub;
218
219   if (!exists $subs->{$cbid}) {
220     warn "Message for topic '$topic' ($cbid) without expected callback, ";
221     return;
222   }
223
224   $_->($data, $topic, $sub) for @{$subs->{$cbid}};
225
226   return 1;
227
228 }
229
230
231 ### Mode validation
232 sub __is_valid_command {
233   my ($self, $cmd) = @_;
234
235   return unless $self->{is_subscriber};
236   return if $cmd =~ /^P?(UN)?SUBSCRIBE$/i;
237   confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ");
238 }
239
240
241 ### Socket operations
242 sub __send_command {
243   my $self = shift;
244   my $cmd  = uc(shift);
245   my $enc  = $self->{encoding};
246   my $deb  = $self->{debug};
247
248   warn "[SEND] $cmd ", Dumper([@_]) if $deb;
249
250   ## Encode command using multi-bulk format
251   my $n_elems = scalar(@_) + 1;
252   my $buf     = "\*$n_elems\r\n";
253   for my $elem ($cmd, @_) {
254     my $bin = $enc ? encode($enc, $elem) : $elem;
255     $buf .= defined($bin) ? '$' . length($bin) . "\r\n$bin\r\n" : "\$-1\r\n";
256   }
257
258   ## Send command, take care for partial writes
259   warn "[SEND RAW] $buf" if $deb;
260   my $sock = $self->{sock} || confess("Not connected to any server");
261   while ($buf) {
262     my $len = syswrite $sock, $buf, length $buf;
263     confess("Could not write to Redis server: $!")
264       unless $len;
265     substr $buf, 0, $len, "";
266   }
267
268   return;
269 }
270
271 sub __read_response {
272   my ($self, $command, $type_r) = @_;
273
274   my ($type, $result) = $self->__read_sock;
275   $$type_r = $type if $type_r;
276
277   if ($type eq '-') {
278     confess "[$command] $result, ";
279   }
280   elsif ($type eq '+') {
281     return $result;
282   }
283   elsif ($type eq '$') {
284     return if $result < 0;
285     return $self->__read_sock($result);
286   }
287   elsif ($type eq '*') {
288     my @list;
289     while ($result--) {
290       push @list, $self->__read_response($command);
291     }
292     return @list;
293   }
294   elsif ($type eq ':') {
295     return $result;
296   }
297   else {
298     confess "unknown answer type: $type ($result), ";
299   }
300 }
301
302 sub __read_sock {
303   my ($self, $len) = @_;
304   my $sock = $self->{sock} || confess("Not connected to any server");
305   my $enc  = $self->{encoding};
306   my $deb  = $self->{debug};
307   my $rbuf = \($self->{rbuf});
308
309   my ($data, $type) = ('', '');
310   my $read_size = $self->{read_size};
311   $read_size = $len + 2 if defined $len && $len + 2 > $read_size;
312
313   while (1) {
314     ## Read NN bytes, strip \r\n at the end
315     if (defined $len) {
316       if (length($$rbuf) >= $len + 2) {
317         $data = substr(substr($$rbuf, 0, $len + 2, ''), 0, -2);
318         last;
319       }
320     }
321     ## No len, means line more, read until \r\n
322     elsif ($$rbuf =~ s/^(.)([^\015\012]*)\015\012//) {
323       ($type, $data) = ($1, $2);
324       last;
325     }
326
327     my $bytes = sysread $sock, $$rbuf, $read_size, length $$rbuf;
328     confess("Error while reading from Redis server: $!")
329       unless defined $bytes;
330     confess("Redis server closed connection") unless $bytes;
331   }
332
333   $data = decode($enc, $data) if $enc;
334   warn "[RECV] '$type$data'" if $self->{debug};
335
336   return ($type, $data) if $type;
337   return $data;
338 }
339
340 sub __can_read_sock {
341   my ($self) = @_;
342   my $sock   = $self->{sock};
343   my $rbuf   = \($self->{rbuf});
344
345   return 1 if $$rbuf;
346   __fh_nonblocking($sock, 1);
347   my $bytes = sysread $sock, $$rbuf, $self->{read_size}, length $$rbuf;
348   __fh_nonblocking($sock, 0);
349   return 1 if $bytes;
350   return 0;
351 }
352
353
354 ### Copied from AnyEvent::Util
355 BEGIN {
356   *__fh_nonblocking = ($^O eq 'MSWin32')
357     ? sub($$) { ioctl $_[0], 0x8004667e, pack "L", $_[1]; }    # FIONBIO
358     : sub($$) { fcntl $_[0], F_SETFL, $_[1] ? O_NONBLOCK : 0; };
359 }
360
361
362 1;
363
364 __END__
365
366 =head1 Connection Handling
367
368 =head2 quit
369
370   $r->quit;
371
372 =head2 ping
373
374   $r->ping || die "no server?";
375
376 =head1 Commands operating on string values
377
378 =head2 set
379
380   $r->set( foo => 'bar' );
381
382   $r->setnx( foo => 42 );
383
384 =head2 get
385
386   my $value = $r->get( 'foo' );
387
388 =head2 mget
389
390   my @values = $r->mget( 'foo', 'bar', 'baz' );
391
392 =head2 incr
393
394   $r->incr('counter');
395
396   $r->incrby('tripplets', 3);
397
398 =head2 decr
399
400   $r->decr('counter');
401
402   $r->decrby('tripplets', 3);
403
404 =head2 exists
405
406   $r->exists( 'key' ) && print "got key!";
407
408 =head2 del
409
410   $r->del( 'key' ) || warn "key doesn't exist";
411
412 =head2 type
413
414   $r->type( 'key' ); # = string
415
416 =head1 Commands operating on the key space
417
418 =head2 keys
419
420   my @keys = $r->keys( '*glob_pattern*' );
421
422 =head2 randomkey
423
424   my $key = $r->randomkey;
425
426 =head2 rename
427
428   my $ok = $r->rename( 'old-key', 'new-key', $new );
429
430 =head2 dbsize
431
432   my $nr_keys = $r->dbsize;
433
434 =head1 Commands operating on lists
435
436 See also L<Redis::List> for tie interface.
437
438 =head2 rpush
439
440   $r->rpush( $key, $value );
441
442 =head2 lpush
443
444   $r->lpush( $key, $value );
445
446 =head2 llen
447
448   $r->llen( $key );
449
450 =head2 lrange
451
452   my @list = $r->lrange( $key, $start, $end );
453
454 =head2 ltrim
455
456   my $ok = $r->ltrim( $key, $start, $end );
457
458 =head2 lindex
459
460   $r->lindex( $key, $index );
461
462 =head2 lset
463
464   $r->lset( $key, $index, $value );
465
466 =head2 lrem
467
468   my $modified_count = $r->lrem( $key, $count, $value );
469
470 =head2 lpop
471
472   my $value = $r->lpop( $key );
473
474 =head2 rpop
475
476   my $value = $r->rpop( $key );
477
478 =head1 Commands operating on sets
479
480 =head2 sadd
481
482   $r->sadd( $key, $member );
483
484 =head2 srem
485
486   $r->srem( $key, $member );
487
488 =head2 scard
489
490   my $elements = $r->scard( $key );
491
492 =head2 sismember
493
494   $r->sismember( $key, $member );
495
496 =head2 sinter
497
498   $r->sinter( $key1, $key2, ... );
499
500 =head2 sinterstore
501
502   my $ok = $r->sinterstore( $dstkey, $key1, $key2, ... );
503
504 =head1 Multiple databases handling commands
505
506 =head2 select
507
508   $r->select( $dbindex ); # 0 for new clients
509
510 =head2 move
511
512   $r->move( $key, $dbindex );
513
514 =head2 flushdb
515
516   $r->flushdb;
517
518 =head2 flushall
519
520   $r->flushall;
521
522 =head1 Sorting
523
524 =head2 sort
525
526   $r->sort("key BY pattern LIMIT start end GET pattern ASC|DESC ALPHA');
527
528 =head1 Persistence control commands
529
530 =head2 save
531
532   $r->save;
533
534 =head2 bgsave
535
536   $r->bgsave;
537
538 =head2 lastsave
539
540   $r->lastsave;
541
542 =head2 shutdown
543
544   $r->shutdown;
545
546 =head1 Remote server control commands
547
548 =head2 info
549
550   my $info_hash = $r->info;
551
552 =head1 ENCODING
553
554 Since Redis knows nothing about encoding, we are forcing utf-8 flag on all data received from Redis.
555 This change is introduced in 1.2001 version.
556
557 This allows us to round-trip utf-8 encoded characters correctly, but might be problem if you push
558 binary junk into Redis and expect to get it back without utf-8 flag turned on.
559
560 =head1 AUTHOR
561
562 Dobrica Pavlinusic, C<< <dpavlin at rot13.org> >>
563
564 =head1 BUGS
565
566 Please report any bugs or feature requests to C<bug-redis at rt.cpan.org>, or through
567 the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Redis>.  I will be notified, and then you'll
568 automatically be notified of progress on your bug as I make changes.
569
570
571
572
573 =head1 SUPPORT
574
575 You can find documentation for this module with the perldoc command.
576
577     perldoc Redis
578         perldoc Redis::List
579         perldoc Redis::Hash
580
581
582 You can also look for information at:
583
584 =over 4
585
586 =item * RT: CPAN's request tracker
587
588 L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=Redis>
589
590 =item * AnnoCPAN: Annotated CPAN documentation
591
592 L<http://annocpan.org/dist/Redis>
593
594 =item * CPAN Ratings
595
596 L<http://cpanratings.perl.org/d/Redis>
597
598 =item * Search CPAN
599
600 L<http://search.cpan.org/dist/Redis>
601
602 =back
603
604
605 =head1 ACKNOWLEDGEMENTS
606
607
608 =head1 COPYRIGHT & LICENSE
609
610 Copyright 2009-2010 Dobrica Pavlinusic, all rights reserved.
611
612 This program is free software; you can redistribute it and/or modify it
613 under the same terms as Perl itself.
614
615
616 =cut
617
618 1; # End of Redis