require Carp;
__PACKAGE__->attr(server => '127.0.0.1:4730');
-__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->singleton });
+__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->new });
__PACKAGE__->attr(error => undef);
-__PACKAGE__->attr(timeout => 300);
+__PACKAGE__->attr(timeout => 5);
__PACKAGE__->attr(encoding => 'UTF-8');
__PACKAGE__->attr(
on_error => sub {
}
}
);
+__PACKAGE__->attr(res => undef);
sub DESTROY {
my $self = shift;
}
);
+ warn "# using gearman server $address:$port\n";
+
return $self;
}
}
my $packet_type = {
+ TEXT => -1, # fake type for text protocol
+
+ CAN_DO => 1,
+
+ PRE_SLEEP => 4,
+
+ NOOP => 6,
+
+ SUBMIT_JOB => 7,
JOB_CREATED => 8,
+ GRAB_JOB => 9,
+ NO_JOB => 10,
+ JOB_ASSIGN => 11,
+
WORK_COMPLETE => 13,
+ WORK_FAIL => 14,
ECHO_REQ => 16,
ECHO_RES => 17,
sub parse_packet {
my ($self,$data) = @_;
+ die "no data in packet" unless $data;
my ($magic, $type, $len) = unpack( "a4NN", $data );
die "wrong magic [$magic]" unless $magic eq "\0RES";
die "unsupported type [$type]" unless exists $nr2type->{$type};
return ( $type, split("\0", substr($data,12,$len)) );
}
-sub res {
- my ( $self, $value ) = @_;
- if ( defined $value ) {
- warn "# ++ res = ",dump $value;
- $self->{_res} ||= $value;
- }
- return $self->{_res};
-}
-
sub req {
my $self = shift;
+warn "XXX req ",dump(@_);
my $type = shift;
+ my $callback = pop @_ if ref $_[$#_] eq 'CODE';
my $data = join("\0", @_);
+ die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
Mojo::Util::encode($self->encoding, $data) if $self->encoding;
$self->{_res} = undef;
my $response;
my $cb = sub {
my ( $self, $data ) = @_;
- $self->ioloop->stop;
- warn "# <<<< ",dump($data);
- my ( $type, @data ) = $self->parse_packet($data);
+
+ if ( substr($data,0,1) ne "\x00" ) {
+ $self->res( $data );
+ $self->stop;
+ return;
+ }
+
+ my ( $type, $handle, @data ) = $self->parse_packet($data);
+ warn "# <<<< ", $nr2type->{$type}, " $handle ",dump(@data);
if ( $type == $packet_type->{JOB_CREATED} ) {
push @{ $self->{_cb_queue} }, sub {
my ( $self,$data ) = @_;
-warn "# WORK_COMPLETE ",dump $data;
- my ( $type, $handle, $ret ) = $self->parse_packet($data);
- die "not WORK_COMPLETE" unless $type == $packet_type->{WORK_COMPLETE};
- $self->res( $ret );
+ my ( $type, $handle, $out ) = $self->parse_packet($data);
+warn "# <<<< ",$nr2type->{$type}, " ",dump $data;
+ if ( $type == $packet_type->{WORK_COMPLETE} ) {
+ warn "WORK_COMPLETE $handle ", dump $out;
+ } elsif ( $type == $packet_type->{WORK_FAIL} ) {
+ warn "WORK_FAIL $handle ", dump $out;
+ }
+ $self->res( $out );
$self->stop;
};
- $self->start; # FIXME sync client
+ $self->ioloop->timer( $self->timeout => sub {
+ my $self = shift;
+ warn "TIMEOUT $handle ", $self->timeout, "s for result";
+ $self->stop;
+ });
+ } elsif ( $type == $packet_type->{NO_JOB} ) {
+ $self->req( 'PRE_SLEEP' );
+ $self->stop;
+ } elsif ( $type == $packet_type->{JOB_ASSIGN} ) {
+ my ( $function, $workload ) = @data;
+ my $callback = $self->{_function}->{$function};
+ die "no $function callback" unless ref $callback eq 'CODE';
+ my $out = $callback->( $workload );
+ warn "## $data $callback = ", dump $out;
+ $self->req( 'WORK_COMPLETE', $handle, $out );
+ $self->req( 'GRAB_JOB' );
+ } elsif ( $type == $packet_type->{NOOP} ) {
+ $self->req( 'GRAB_JOB' );
+ } else {
+ $self->stop;
}
- $self->res( $#data == 0 ? $data[0] : [ @data ] );
+ my $out = $#data == 0 ? $data[0] : [ @data ];
+ $self->res( $out );
+
};
+# $data .= "\0" if $data;
my $len = length($data);
- my $message = pack("a4NN", "\0REQ", $type, length $data ) . $data;
- warn "# >>>> ",dump($data);
+ my $message = $type ne 'TEXT'
+ ? pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data
+ : "$data\r\n"
+ ;
+ warn "# >>>> $type ",dump($message);
my $mqueue = $self->{_message_queue} ||= [];
my $cqueue = $self->{_cb_queue} ||= [];
$self->connect unless $self->{_connection};
$self->_send_next_message;
- $self->ioloop->start;
+ if ( $type eq 'CAN_DO' ) {
+ $self->{_function}->{$data} = $callback;
+ $self->res( $callback );
+ warn "# installed $data callback $callback";
+ } elsif ( $type =~ m/^(ECHO_REQ|SUBMIT_JOB|GRAB_JOB|TEXT)$/ ) { # sync commands
+ $self->start;
+ }
$self->res;
}
sub start {
my ($self) = @_;
-
+ warn "# start";
$self->ioloop->start;
return $self;
}
sub stop {
my ($self) = @_;
-
+ warn "# stop";
$self->ioloop->stop;
return $self;
}
if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
while (my $message = shift @{$self->{_message_queue}}) {
- warn "# write ",dump($message);
- $self->ioloop->write($c, $message);
+ warn "# write ",dump($message);
+ $self->ioloop->write($c, $message);
}
}
}
sub _on_error {
my ($self, $ioloop, $id, $error) = @_;
+ warn "ERROR: $error";
+
$self->error($error);
$self->_inform_queue;
sub _on_read {
my ($self, $ioloop, $id, $data) = @_;
+ warn "<<<< _on_read ",dump( $id, $data );
+
+ $data = $self->{message}->{$id} .= $data;
+ my ($magic, $type, $len) = unpack( "a4NN", $data );
+
+ if ( substr($data,0,1) ne "\x00" ) {
+ return if $data !~ s/\.[\n\r]$//s;
+ } elsif ( length $data < $len ) {
+ warn "# _on_read incomplete message";
+ return;
+ }
+
+ warn "#### data ",dump($data);
+
my $cb = shift @{$self->{_cb_queue}};
if ($cb) {
- Mojo::Util::decode($self->encoding, $data) if $data;
+# Mojo::Util::decode($self->encoding, $data) if $data; # FIXME
warn "# on read callback with ", dump($data);
$cb->($self, $data);
} else {
warn "no callback";
}
+ delete $self->{message}->{$id};
+
# Reset error after callback dispatching
$self->error(undef);
}
=head2 C<req>
- $gearman->req( $type, $data );
+ $gearman->req( $type, $data, ..., sub { # callback } );
=head2 C<error>