}
my $packet_type = {
+ CAN_DO => 1,
+
+ PRE_SLEEP => 4,
+
+ NOOP => 6,
+
SUBMIT_JOB => 7,
JOB_CREATED => 8,
+ GRAB_JOB => 9,
+ NO_JOB => 10,
+ JOB_ASSIGN => 11,
+
WORK_COMPLETE => 13,
ECHO_REQ => 16,
sub req {
my $self = shift;
+warn "XXX req ",dump(@_);
my $type = shift;
+ my $callback = pop @_ if ref $_[$#_] eq 'CODE';
my $data = join("\0", @_);
die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
$self->res( $out );
$self->stop;
};
+ } elsif ( $type == $packet_type->{NO_JOB} ) {
+ $self->req( 'PRE_SLEEP' );
+ } elsif ( $type == $packet_type->{JOB_ASSIGN} ) {
+ my ( $handle, $function, $workload ) = @data;
+ my $callback = $self->{_function}->{$function};
+ die "no $function callback" unless ref $callback eq 'CODE';
+ warn "# calling $data callback $callback";
+ my $out = $callback->( $workload );
+ warn "# === ",dump $out;
+ $self->req( 'WORK_COMPLETE', $handle, $out );
+ $self->req( 'GRAB_JOB' );
+ } elsif ( $type == $packet_type->{NOOP} ) {
+ $self->req( 'GRAB_JOB' );
} else {
$self->stop;
}
$self->connect unless $self->{_connection};
$self->_send_next_message;
- $self->start;
+ if ( $type eq 'CAN_DO' ) {
+ $self->{_function}->{$data} = $callback;
+ warn "# installed $data callback $callback";
+ $self->req( 'GRAB_JOB' );
+ }
+
+
+# $self->start;
$self->res;
}
--- /dev/null
+#!/usr/bin/env perl
+use warnings;
+use strict;
+
+use Test::More tests => 7;
+use Data::Dump qw(dump);
+use lib 'lib';
+
+use_ok 'MojoX::Gearman';
+
+my $g = new_ok 'MojoX::Gearman';
+
+my $name = "mojo_g";
+
+ok( my $echo = $g->req( 'CAN_DO', $name, sub {
+ my $payload = shift;
+ warn "DO $name ", dump($payload), $/;
+ return $payload + 1;
+}), 'CAN_DO' );
+diag $echo;
+
+$g->start;
+
+diag "press <enter> to finish";
+<STDIN>;
+