+### PubSub
+sub wait_for_messages {
+ my ($self, $timeout) = @_;
+
+ my $s = IO::Select->new;
+ $s->add($self->{sock});
+
+ my $count = 0;
+ while ($s->can_read($timeout)) {
+ while ($self->__can_read_sock) {
+ my @m = $self->__read_response('WAIT_FOR_MESSAGES');
+ $self->__process_pubsub_msg(\@m);
+ $count++;
+ }
+ }
+
+ return $count;
+}
+
+sub __process_unsubscribe_requests {
+ my ($self, $cb, $pr, @unsubs) = @_;
+ my $subs = $self->{subscribers};
+
+ my @subs_to_unsubscribe;
+ for my $sub (@unsubs) {
+ my $key = "${pr}message:$sub";
+ my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{$subs->{$key}}];
+ next if @$cbs;
+
+ delete $subs->{$key};
+ push @subs_to_unsubscribe, $sub;
+ }
+
+ return @subs_to_unsubscribe;
+}
+
+sub __process_subscription_changes {
+ my ($self, $cmd, $expected) = @_;
+ my $subs = $self->{subscribers};
+
+ while (%$expected) {
+ my @m = $self->__read_response($cmd);
+
+ ## Deal with pending PUBLISH'ed messages
+ if ($m[0] =~ /^p?message$/) {
+ $self->__process_pubsub_msg(\@m);
+ next;
+ }
+
+ my ($key, $unsub) = $m[0] =~ m/^(p)?(un)?subscribe$/;
+ $key .= "message:$m[1]";
+ my $cb = delete $expected->{$key};
+
+ push @{$subs->{$key}}, $cb unless $unsub;
+
+ $self->{is_subscriber} = $m[2];
+ }
+}
+
+sub __process_pubsub_msg {
+ my ($self, $m) = @_;
+ my $subs = $self->{subscribers};
+
+ my $sub = $m->[1];
+ my $cbid = "$m->[0]:$sub";
+ my $data = pop @$m;
+ my $topic = $m->[2] || $sub;
+
+ if (!exists $subs->{$cbid}) {
+ warn "Message for topic '$topic' ($cbid) without expected callback, ";
+ return;
+ }
+
+ $_->($data, $topic, $sub) for @{$subs->{$cbid}};
+
+ return 1;
+
+}
+
+