added timeout for JOB_CREATED
[MojoX-Gearman.git] / lib / MojoX / Gearman.pm
index 25d84c2..341206e 100644 (file)
@@ -16,7 +16,7 @@ require Carp;
 __PACKAGE__->attr(server   => '127.0.0.1:4730');
 __PACKAGE__->attr(ioloop   => sub { Mojo::IOLoop->new });
 __PACKAGE__->attr(error        => undef);
-__PACKAGE__->attr(timeout  => 300);
+__PACKAGE__->attr(timeout  => 5);
 __PACKAGE__->attr(encoding => 'UTF-8');
 __PACKAGE__->attr(
        on_error => sub {
@@ -77,6 +77,8 @@ sub connected {
 }
 
 my $packet_type = {
+       TEXT => -1,     # fake type for text protocol
+
        CAN_DO => 1,
 
        PRE_SLEEP => 4,
@@ -128,8 +130,15 @@ warn "XXX req ",dump(@_);
        my $response;
        my $cb = sub {
                my ( $self, $data ) = @_;
-               my ( $type, @data ) = $self->parse_packet($data);
-               warn "# <<<< ", $nr2type->{$type}, " ",dump(@data);
+
+               if ( substr($data,0,1) ne "\x00" ) {
+                       $self->res( $data );
+                       $self->stop;
+                       return;
+               }
+
+               my ( $type, $handle, @data ) = $self->parse_packet($data);
+               warn "# <<<< ", $nr2type->{$type}, " $handle ",dump(@data);
 
                if ( $type == $packet_type->{JOB_CREATED} ) {
                        push @{ $self->{_cb_queue} }, sub {
@@ -144,11 +153,16 @@ warn "# <<<< ",$nr2type->{$type}, " ",dump $data;
                                $self->res( $out );
                                $self->stop;
                        };
+                       $self->ioloop->timer( $self->timeout => sub {
+                               my $self = shift;
+                               warn "TIMEOUT $handle ", $self->timeout, "s for result";
+                               $self->stop;
+                       });
                } elsif ( $type == $packet_type->{NO_JOB} ) {
                        $self->req( 'PRE_SLEEP' );
                        $self->stop;
                } elsif ( $type == $packet_type->{JOB_ASSIGN} ) {
-                       my ( $handle, $function, $workload ) = @data;
+                       my ( $function, $workload ) = @data;
                        my $callback = $self->{_function}->{$function};
                        die "no $function callback" unless ref $callback eq 'CODE';
                        my $out = $callback->( $workload );
@@ -168,7 +182,10 @@ warn "# <<<< ",$nr2type->{$type}, " ",dump $data;
 
 #      $data .= "\0" if $data;
        my $len = length($data);
-       my $message = pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data;
+       my $message = $type ne 'TEXT'
+               ? pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data
+               : "$data\r\n"
+       ;
        warn "# >>>> $type ",dump($message);
 
        my $mqueue = $self->{_message_queue} ||= [];
@@ -184,7 +201,7 @@ warn "# <<<< ",$nr2type->{$type}, " ",dump $data;
                $self->{_function}->{$data} = $callback;
                $self->res( $callback );
                warn "# installed $data callback $callback";
-       } elsif ( $type =~ m/^(ECHO_REQ|SUBMIT_JOB|GRAB_JOB)$/ ) { # sync commands
+       } elsif ( $type =~ m/^(ECHO_REQ|SUBMIT_JOB|GRAB_JOB|TEXT)$/ ) { # sync commands
                $self->start;
        }
 
@@ -267,7 +284,9 @@ sub _on_read {
        $data = $self->{message}->{$id} .= $data;
        my ($magic, $type, $len) = unpack( "a4NN", $data );
 
-       if ( length $data < $len ) {
+       if ( substr($data,0,1) ne "\x00" ) {
+               return if $data !~ s/\.[\n\r]$//s;
+       } elsif ( length $data < $len ) {
                warn "# _on_read incomplete message";
                return;
        }