- my $self = shift;
-
- my $sock = $self->{sock} || die "no server connected";
-
- my $command = $AUTOLOAD;
- $command =~ s/.*://;
-
- warn "## $command ",Dumper(@_) if $self->{debug};
-
- my $send;
-
- if ( defined $bulk_command->{$command} ) {
- my $value = pop;
- $value = '' if ! defined $value;
- $send
- = uc($command)
- . ' '
- . join(' ', @_)
- . ' '
- . length( $value )
- . "\r\n$value\r\n"
- ;
- } else {
- $send
- = uc($command)
- . ' '
- . join(' ', @_)
- . "\r\n"
- ;
- }
-
- warn ">> $send" if $self->{debug};
- print $sock $send;
-
- if ( $command eq 'quit' ) {
- close( $sock ) || die "can't close socket: $!";
- return 1;
- }
-
- my $result = <$sock> || die "can't read socket: $!";
- warn "<< $result" if $self->{debug};
- my $type = substr($result,0,1);
- $result = substr($result,1,-2);
-
- if ( $command eq 'info' ) {
- my $hash;
- foreach my $l ( split(/\r\n/, $self->__read_bulk($result) ) ) {
- my ($n,$v) = split(/:/, $l, 2);
- $hash->{$n} = $v;
- }
- return $hash;
- } elsif ( $command eq 'keys' ) {
- my $keys = $self->__read_bulk($result);
- return split(/\s/, $keys) if $keys;
- return;
- }
-
- if ( $type eq '-' ) {
- confess "[$command] $result";
- } elsif ( $type eq '+' ) {
- return $result;
- } elsif ( $type eq '$' ) {
- return $self->__read_bulk($result);
- } elsif ( $type eq '*' ) {
- return $self->__read_multi_bulk($result);
- } elsif ( $type eq ':' ) {
- return $result; # FIXME check if int?
- } else {
- confess "unknown type: $type", $self->__read_line();
- }
+ my $self = shift;
+ my $sock = $self->{sock} || confess("Not connected to any server");
+ my $enc = $self->{encoding};
+ my $deb = $self->{debug};
+
+ my $command = $AUTOLOAD;
+ $command =~ s/.*://;
+ $self->__is_valid_command($command);
+
+ ## PubSub commands use a different answer handling
+ if (my ($pr, $unsub) = $command =~ /^(p)?(un)?subscribe$/i) {
+ $pr = '' unless $pr;
+
+ my $cb = pop;
+ confess("Missing required callback in call to $command(), ")
+ unless ref($cb) eq 'CODE';
+
+ my @subs = @_;
+ @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
+ if $unsub;
+ return unless @subs;
+
+ $self->__send_command($command, @subs);
+
+ my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
+ return $self->__process_subscription_changes($command, \%cbs);
+ }
+
+ $self->__send_command($command, @_);
+ return $self->__read_response($command);
+}
+
+
+### Commands with extra logic
+sub quit {
+ my ($self) = @_;
+
+ $self->__send_command('QUIT');
+
+ delete $self->{rbuf};
+ close(delete $self->{sock}) || confess("Can't close socket: $!");
+
+ return 1;
+}
+
+sub info {
+ my ($self) = @_;
+ $self->__is_valid_command('INFO');
+
+ $self->__send_command('INFO');
+
+ my $info = $self->__read_response('INFO');
+
+ return {map { split(/:/, $_, 2) } split(/\r\n/, $info)};
+}
+
+sub keys {
+ my $self = shift;
+ $self->__is_valid_command('KEYS');
+
+ $self->__send_command('KEYS', @_);
+
+ my @keys = $self->__read_response('KEYS', \my $type);
+ ## Support redis > 1.26
+ return @keys if $type eq '*';
+
+ ## Support redis <= 1.2.6
+ return split(/\s/, $keys[0]) if $keys[0];
+ return;
+}
+
+
+### PubSub
+sub wait_for_messages {
+ my ($self, $timeout) = @_;
+
+ my $s = IO::Select->new;
+ $s->add($self->{sock});
+
+ my $count = 0;
+ while ($s->can_read($timeout)) {
+ while ($self->__can_read_sock) {
+ my @m = $self->__read_response('WAIT_FOR_MESSAGES');
+ $self->__process_pubsub_msg(\@m);
+ $count++;
+ }
+ }
+
+ return $count;
+}
+
+sub __process_unsubscribe_requests {
+ my ($self, $cb, $pr, @unsubs) = @_;
+ my $subs = $self->{subscribers};
+
+ my @subs_to_unsubscribe;
+ for my $sub (@unsubs) {
+ my $key = "${pr}message:$sub";
+ my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}];
+ next if @$cbs;
+
+ delete $subs->{$key};
+ push @subs_to_unsubscribe, $sub;
+ }
+
+ return @subs_to_unsubscribe;
+}
+
+sub __process_subscription_changes {
+ my ($self, $cmd, $expected) = @_;
+ my $subs = $self->{subscribers};
+
+ while (%$expected) {
+ my @m = $self->__read_response($cmd);
+
+ ## Deal with pending PUBLISH'ed messages
+ if ($m[0] =~ /^p?message$/) {
+ $self->__process_pubsub_msg(\@m);
+ next;
+ }
+
+ my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
+ $key .= "message:$m[1]";
+ my $cb = delete $expected->{$key};
+
+ push @{$subs->{$key}}, $cb unless $unsub;
+
+ $self->{is_subscriber} = $m[2];
+ }
+}
+
+sub __process_pubsub_msg {
+ my ($self, $m) = @_;
+ my $subs = $self->{subscribers};
+
+ my $sub = $m->[1];
+ my $cbid = "$m->[0]:$sub";
+ my $data = pop @$m;
+ my $topic = $m->[2] || $sub;
+
+ if (!exists $subs->{$cbid}) {
+ warn "Message for topic '$topic' ($cbid) without expected callback, ";
+ return;
+ }
+
+ $_->($data, $topic, $sub) for @{$subs->{$cbid}};
+
+ return 1;
+