added timeout for JOB_CREATED
[MojoX-Gearman.git] / lib / MojoX / Gearman.pm
index bc1b1f1..341206e 100644 (file)
@@ -14,9 +14,9 @@ use Data::Dump qw(dump);
 require Carp;
 
 __PACKAGE__->attr(server   => '127.0.0.1:4730');
-__PACKAGE__->attr(ioloop   => sub { Mojo::IOLoop->singleton });
+__PACKAGE__->attr(ioloop   => sub { Mojo::IOLoop->new });
 __PACKAGE__->attr(error        => undef);
-__PACKAGE__->attr(timeout  => 300);
+__PACKAGE__->attr(timeout  => 5);
 __PACKAGE__->attr(encoding => 'UTF-8');
 __PACKAGE__->attr(
        on_error => sub {
@@ -26,6 +26,7 @@ __PACKAGE__->attr(
                  }
        }
 );
+__PACKAGE__->attr(res => undef);
 
 sub DESTROY {
        my $self = shift;
@@ -64,6 +65,8 @@ sub connect {
                }
        );
 
+       warn "# using gearman server $address:$port\n";
+
        return $self;
 }
 
@@ -74,9 +77,23 @@ sub connected {
 }
 
 my $packet_type = {
+       TEXT => -1,     # fake type for text protocol
+
+       CAN_DO => 1,
+
+       PRE_SLEEP => 4,
+
+       NOOP => 6,
+
+       SUBMIT_JOB => 7,
        JOB_CREATED => 8,
 
+       GRAB_JOB => 9,
+       NO_JOB => 10,
+       JOB_ASSIGN => 11,
+
        WORK_COMPLETE => 13,
+       WORK_FAIL => 14,
 
        ECHO_REQ => 16,
        ECHO_RES => 17,
@@ -90,6 +107,7 @@ $nr2type->{ $packet_type->{$_} } = $_ foreach keys %$packet_type;
 
 sub parse_packet {
        my ($self,$data) = @_;
+       die "no data in packet" unless $data;
        my ($magic, $type, $len) = unpack( "a4NN", $data );
        die "wrong magic [$magic]" unless $magic eq "\0RES";
        die "unsupported type [$type]" unless exists $nr2type->{$type};
@@ -97,20 +115,14 @@ sub parse_packet {
        return ( $type, split("\0", substr($data,12,$len)) );
 }
 
-sub res {
-       my ( $self, $value ) = @_;
-       if ( defined $value ) {
-               warn "# ++ res = ",dump $value;
-               $self->{_res} ||= $value;
-       }
-       return $self->{_res};
-}
-
 sub req {
        my $self = shift;
+warn "XXX req ",dump(@_);
        my $type = shift;
+       my $callback = pop @_ if ref $_[$#_] eq 'CODE';
        my $data = join("\0", @_);
 
+       die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
        Mojo::Util::encode($self->encoding, $data) if $self->encoding;
 
        $self->{_res} = undef;
@@ -118,28 +130,63 @@ sub req {
        my $response;
        my $cb = sub {
                my ( $self, $data ) = @_;
-               $self->ioloop->stop;
-               warn "# <<<< ",dump($data);
-               my ( $type, @data ) = $self->parse_packet($data);
+
+               if ( substr($data,0,1) ne "\x00" ) {
+                       $self->res( $data );
+                       $self->stop;
+                       return;
+               }
+
+               my ( $type, $handle, @data ) = $self->parse_packet($data);
+               warn "# <<<< ", $nr2type->{$type}, " $handle ",dump(@data);
 
                if ( $type == $packet_type->{JOB_CREATED} ) {
                        push @{ $self->{_cb_queue} }, sub {
                                my ( $self,$data ) = @_;
-warn "# WORK_COMPLETE ",dump $data;
-                               my ( $type, $handle, $ret ) = $self->parse_packet($data);
-                               die "not WORK_COMPLETE" unless $type == $packet_type->{WORK_COMPLETE};
-                               $self->res( $ret );
+                               my ( $type, $handle, $out ) = $self->parse_packet($data);
+warn "# <<<< ",$nr2type->{$type}, " ",dump $data;
+                               if ( $type == $packet_type->{WORK_COMPLETE} ) {
+                                       warn "WORK_COMPLETE $handle ", dump $out;
+                               } elsif ( $type == $packet_type->{WORK_FAIL} ) {
+                                       warn "WORK_FAIL $handle ", dump $out;
+                               }
+                               $self->res( $out );
                                $self->stop;
                        };
-                       $self->start; # FIXME sync client
+                       $self->ioloop->timer( $self->timeout => sub {
+                               my $self = shift;
+                               warn "TIMEOUT $handle ", $self->timeout, "s for result";
+                               $self->stop;
+                       });
+               } elsif ( $type == $packet_type->{NO_JOB} ) {
+                       $self->req( 'PRE_SLEEP' );
+                       $self->stop;
+               } elsif ( $type == $packet_type->{JOB_ASSIGN} ) {
+                       my ( $function, $workload ) = @data;
+                       my $callback = $self->{_function}->{$function};
+                       die "no $function callback" unless ref $callback eq 'CODE';
+                       my $out = $callback->( $workload );
+                       warn "## $data $callback = ", dump $out;
+                       $self->req( 'WORK_COMPLETE', $handle, $out );
+                       $self->req( 'GRAB_JOB' );
+               } elsif ( $type == $packet_type->{NOOP} ) {
+                       $self->req( 'GRAB_JOB' );
+               } else {
+                       $self->stop;
                }
 
-               $self->res( $#data == 0 ? $data[0] : [ @data ] );
+               my $out = $#data == 0 ? $data[0] : [ @data ];
+               $self->res( $out );
+
        };
 
+#      $data .= "\0" if $data;
        my $len = length($data);
-       my $message = pack("a4NN", "\0REQ", $type, length $data ) . $data;
-       warn "# >>>> ",dump($data);
+       my $message = $type ne 'TEXT'
+               ? pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data
+               : "$data\r\n"
+       ;
+       warn "# >>>> $type ",dump($message);
 
        my $mqueue = $self->{_message_queue} ||= [];
        my $cqueue = $self->{_cb_queue}   ||= [];
@@ -150,21 +197,27 @@ warn "# WORK_COMPLETE ",dump $data;
        $self->connect unless $self->{_connection};
        $self->_send_next_message;
 
-       $self->ioloop->start;
+       if ( $type eq 'CAN_DO' ) {
+               $self->{_function}->{$data} = $callback;
+               $self->res( $callback );
+               warn "# installed $data callback $callback";
+       } elsif ( $type =~ m/^(ECHO_REQ|SUBMIT_JOB|GRAB_JOB|TEXT)$/ ) { # sync commands
+               $self->start;
+       }
 
        $self->res;
 }
 
 sub start {
        my ($self) = @_;
-
+       warn "# start";
        $self->ioloop->start;
        return $self;
 }
 
 sub stop {
        my ($self) = @_;
-
+       warn "# stop";
        $self->ioloop->stop;
        return $self;
 }
@@ -174,8 +227,8 @@ sub _send_next_message {
 
        if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
                while (my $message = shift @{$self->{_message_queue}}) {
-               warn "# write ",dump($message);
-               $self->ioloop->write($c, $message);
+                       warn "# write ",dump($message);
+                       $self->ioloop->write($c, $message);
                }
        }
 }
@@ -192,6 +245,8 @@ sub _on_connect {
 sub _on_error {
        my ($self, $ioloop, $id, $error) = @_;
 
+       warn "ERROR: $error";
+
        $self->error($error);
        $self->_inform_queue;
 
@@ -224,15 +279,31 @@ sub _inform_queue {
 sub _on_read {
        my ($self, $ioloop, $id, $data) = @_;
 
+       warn "<<<< _on_read ",dump( $id, $data );
+
+       $data = $self->{message}->{$id} .= $data;
+       my ($magic, $type, $len) = unpack( "a4NN", $data );
+
+       if ( substr($data,0,1) ne "\x00" ) {
+               return if $data !~ s/\.[\n\r]$//s;
+       } elsif ( length $data < $len ) {
+               warn "# _on_read incomplete message";
+               return;
+       }
+
+       warn "#### data ",dump($data);
+
        my $cb = shift @{$self->{_cb_queue}};
        if ($cb) {
-               Mojo::Util::decode($self->encoding, $data) if $data;
+#              Mojo::Util::decode($self->encoding, $data) if $data; # FIXME
                warn "# on read callback with ", dump($data);
                $cb->($self, $data);
        } else {
                warn "no callback";
        }
 
+       delete $self->{message}->{$id};
+
        # Reset error after callback dispatching
        $self->error(undef);
 }
@@ -292,7 +363,7 @@ Encoding used for stored data, defaults to C<UTF-8>.
 
 =head2 C<req>
 
-       $gearman->req( $type, $data );
+       $gearman->req( $type, $data, ..., sub { # callback } );
 
 =head2 C<error>