require Carp;
__PACKAGE__->attr(server => '127.0.0.1:4730');
-__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->singleton });
+__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->new });
__PACKAGE__->attr(error => undef);
__PACKAGE__->attr(timeout => 300);
__PACKAGE__->attr(encoding => 'UTF-8');
}
my $packet_type = {
+ SUBMIT_JOB => 7,
JOB_CREATED => 8,
WORK_COMPLETE => 13,
my $type = shift;
my $data = join("\0", @_);
+ die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
Mojo::Util::encode($self->encoding, $data) if $self->encoding;
$self->{_res} = undef;
my $response;
my $cb = sub {
my ( $self, $data ) = @_;
- $self->ioloop->stop;
warn "# <<<< ",dump($data);
my ( $type, @data ) = $self->parse_packet($data);
push @{ $self->{_cb_queue} }, sub {
my ( $self,$data ) = @_;
warn "# WORK_COMPLETE ",dump $data;
- my ( $type, $handle, $ret ) = $self->parse_packet($data);
+ my ( $type, $handle, $out ) = $self->parse_packet($data);
die "not WORK_COMPLETE" unless $type == $packet_type->{WORK_COMPLETE};
- $self->res( $ret );
+ $self->res( $out );
$self->stop;
};
- $self->start; # FIXME sync client
+ } else {
+ $self->stop;
}
- $self->res( $#data == 0 ? $data[0] : [ @data ] );
+ my $out = $#data == 0 ? $data[0] : [ @data ];
+ $self->res( $out );
+
};
$data .= "\0" if $data;
my $len = length($data);
- my $message = pack("a4NN", "\0REQ", $type, length $data ) . $data;
- warn "# >>>> ",dump($data);
+ my $message = pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data;
+ warn "# >>>> ",dump($message);
my $mqueue = $self->{_message_queue} ||= [];
my $cqueue = $self->{_cb_queue} ||= [];
$self->connect unless $self->{_connection};
$self->_send_next_message;
- $self->ioloop->start;
+ $self->start;
$self->res;
}
sub start {
my ($self) = @_;
-
+ warn "# start";
$self->ioloop->start;
return $self;
}
sub stop {
my ($self) = @_;
-
+ warn "# stop";
$self->ioloop->stop;
return $self;
}
if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
while (my $message = shift @{$self->{_message_queue}}) {
- warn "# write ",dump($message);
- $self->ioloop->write($c, $message);
+ warn "# write ",dump($message);
+ $self->ioloop->write($c, $message);
}
}
}
=head2 C<req>
- $gearman->req( $type, $data );
+ $gearman->req( $type, $data, ..., sub { # callback } );
=head2 C<error>