sync SUBMIT_JOB somewhat working
authorDobrica Pavlinusic <dpavlin@rot13.org>
Tue, 1 Mar 2011 11:00:54 +0000 (11:00 +0000)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Tue, 1 Mar 2011 11:00:54 +0000 (11:00 +0000)
lib/MojoX/Gearman.pm
t/gearman.t

index 4448b1c..bc1b1f1 100644 (file)
@@ -73,6 +73,39 @@ sub connected {
        return $self->{_connection};
 }
 
+my $packet_type = {
+       JOB_CREATED => 8,
+
+       WORK_COMPLETE => 13,
+
+       ECHO_REQ => 16,
+       ECHO_RES => 17,
+
+       ERROR => 19,
+};
+
+my $nr2type;
+$nr2type->{ $packet_type->{$_} } = $_ foreach keys %$packet_type;
+
+
+sub parse_packet {
+       my ($self,$data) = @_;
+       my ($magic, $type, $len) = unpack( "a4NN", $data );
+       die "wrong magic [$magic]" unless $magic eq "\0RES";
+       die "unsupported type [$type]" unless exists $nr2type->{$type};
+       die "ERROR" if $type == $packet_type->{ERROR};
+       return ( $type, split("\0", substr($data,12,$len)) );
+}
+
+sub res {
+       my ( $self, $value ) = @_;
+       if ( defined $value ) {
+               warn "# ++ res = ",dump $value;
+               $self->{_res} ||= $value;
+       }
+       return $self->{_res};
+}
+
 sub req {
        my $self = shift;
        my $type = shift;
@@ -80,16 +113,28 @@ sub req {
 
        Mojo::Util::encode($self->encoding, $data) if $self->encoding;
 
-       my $ret;
+       $self->{_res} = undef;
 
+       my $response;
        my $cb = sub {
                my ( $self, $data ) = @_;
                $self->ioloop->stop;
                warn "# <<<< ",dump($data);
-               my ($magic, $type, $len) = unpack( "a4NN", $data );
-               die "wrong magic [$magic]" unless $magic eq "\0RES";
-               die "ERROR" if $type == 19;
-               $ret = substr($data,12,$len);
+               my ( $type, @data ) = $self->parse_packet($data);
+
+               if ( $type == $packet_type->{JOB_CREATED} ) {
+                       push @{ $self->{_cb_queue} }, sub {
+                               my ( $self,$data ) = @_;
+warn "# WORK_COMPLETE ",dump $data;
+                               my ( $type, $handle, $ret ) = $self->parse_packet($data);
+                               die "not WORK_COMPLETE" unless $type == $packet_type->{WORK_COMPLETE};
+                               $self->res( $ret );
+                               $self->stop;
+                       };
+                       $self->start; # FIXME sync client
+               }
+
+               $self->res( $#data == 0 ? $data[0] : [ @data ] );
        };
 
        my $len = length($data);
@@ -99,7 +144,6 @@ sub req {
        my $mqueue = $self->{_message_queue} ||= [];
        my $cqueue = $self->{_cb_queue}   ||= [];
 
-
        push @$mqueue, $message;
        push @$cqueue, $cb;
 
@@ -108,7 +152,7 @@ sub req {
 
        $self->ioloop->start;
 
-       return $ret;
+       $self->res;
 }
 
 sub start {
index 9a3c60c..0e2676f 100755 (executable)
@@ -2,12 +2,17 @@
 use warnings;
 use strict;
 
-use Test::More tests => 4;
+use Test::More tests => 5;
+use Data::Dump qw(dump);
 use lib 'lib';
 
 use_ok 'MojoX::Gearman';
 
 my $g = new_ok 'MojoX::Gearman';
 
-ok( my $echo = $g->req( 16, "foobar" ), 'echo' );
+ok( my $echo = $g->req( 16, "foobar" ), 'ECHO' );
 cmp_ok $echo, 'eq', "foobar";
+
+ok( my $ping = $g->req( 7, 'ping', '', 'bla' ), 'SUBMIT_JOB' );
+diag dump $ping;
+