first worker
authorDobrica Pavlinusic <dpavlin@rot13.org>
Tue, 1 Mar 2011 22:09:38 +0000 (22:09 +0000)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Tue, 1 Mar 2011 22:10:35 +0000 (22:10 +0000)
lib/MojoX/Gearman.pm
t/gearman-worker.t [new file with mode: 0755]

index a7692fd..572af92 100644 (file)
@@ -75,9 +75,19 @@ sub connected {
 }
 
 my $packet_type = {
+       CAN_DO => 1,
+
+       PRE_SLEEP => 4,
+
+       NOOP => 6,
+
        SUBMIT_JOB => 7,
        JOB_CREATED => 8,
 
+       GRAB_JOB => 9,
+       NO_JOB => 10,
+       JOB_ASSIGN => 11,
+
        WORK_COMPLETE => 13,
 
        ECHO_REQ => 16,
@@ -102,7 +112,9 @@ sub parse_packet {
 
 sub req {
        my $self = shift;
+warn "XXX req ",dump(@_);
        my $type = shift;
+       my $callback = pop @_ if ref $_[$#_] eq 'CODE';
        my $data = join("\0", @_);
 
        die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
@@ -125,6 +137,19 @@ warn "# WORK_COMPLETE ",dump $data;
                                $self->res( $out );
                                $self->stop;
                        };
+               } elsif ( $type == $packet_type->{NO_JOB} ) {
+                       $self->req( 'PRE_SLEEP' );
+               } elsif ( $type == $packet_type->{JOB_ASSIGN} ) {
+                       my ( $handle, $function, $workload ) = @data;
+                       my $callback = $self->{_function}->{$function};
+                       die "no $function callback" unless ref $callback eq 'CODE';
+                       warn "# calling $data callback $callback";
+                       my $out = $callback->( $workload );
+                       warn "# === ",dump $out;
+                       $self->req( 'WORK_COMPLETE', $handle, $out );
+                       $self->req( 'GRAB_JOB' );
+               } elsif ( $type == $packet_type->{NOOP} ) {
+                       $self->req( 'GRAB_JOB' );
                } else {
                        $self->stop;
                }
@@ -148,7 +173,14 @@ warn "# WORK_COMPLETE ",dump $data;
        $self->connect unless $self->{_connection};
        $self->_send_next_message;
 
-       $self->start;
+       if ( $type eq 'CAN_DO' ) {
+               $self->{_function}->{$data} = $callback;
+               warn "# installed $data callback $callback";
+               $self->req( 'GRAB_JOB' );
+       }
+               
+
+#      $self->start;
 
        $self->res;
 }
diff --git a/t/gearman-worker.t b/t/gearman-worker.t
new file mode 100755 (executable)
index 0000000..dc06527
--- /dev/null
@@ -0,0 +1,26 @@
+#!/usr/bin/env perl
+use warnings;
+use strict;
+
+use Test::More tests => 7;
+use Data::Dump qw(dump);
+use lib 'lib';
+
+use_ok 'MojoX::Gearman';
+
+my $g = new_ok 'MojoX::Gearman';
+
+my $name = "mojo_g";
+
+ok( my $echo = $g->req( 'CAN_DO', $name, sub {
+       my $payload = shift;
+       warn "DO $name ", dump($payload), $/;
+       return $payload + 1;
+}), 'CAN_DO' );
+diag $echo;
+
+$g->start;
+
+diag "press <enter> to finish";
+<STDIN>;
+