require Carp;
__PACKAGE__->attr(server => '127.0.0.1:4730');
-__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->singleton });
+__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->new });
__PACKAGE__->attr(error => undef);
__PACKAGE__->attr(timeout => 300);
__PACKAGE__->attr(encoding => 'UTF-8');
return $self->{_connection};
}
+my $packet_type = {
+ SUBMIT_JOB => 7,
+ JOB_CREATED => 8,
+
+ WORK_COMPLETE => 13,
+
+ ECHO_REQ => 16,
+ ECHO_RES => 17,
+
+ ERROR => 19,
+};
+
+my $nr2type;
+$nr2type->{ $packet_type->{$_} } = $_ foreach keys %$packet_type;
+
+
+sub parse_packet {
+ my ($self,$data) = @_;
+ my ($magic, $type, $len) = unpack( "a4NN", $data );
+ die "wrong magic [$magic]" unless $magic eq "\0RES";
+ die "unsupported type [$type]" unless exists $nr2type->{$type};
+ die "ERROR" if $type == $packet_type->{ERROR};
+ return ( $type, split("\0", substr($data,12,$len)) );
+}
+
+sub res {
+ my ( $self, $value ) = @_;
+ if ( defined $value ) {
+ warn "# ++ res = ",dump $value;
+ $self->{_res} ||= $value;
+ }
+ return $self->{_res};
+}
+
sub req {
my $self = shift;
my $type = shift;
my $data = join("\0", @_);
+ die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
Mojo::Util::encode($self->encoding, $data) if $self->encoding;
- my $ret;
+ $self->{_res} = undef;
+ my $response;
my $cb = sub {
my ( $self, $data ) = @_;
- $self->ioloop->stop;
warn "# <<<< ",dump($data);
- my ($magic, $type, $len) = unpack( "a4NN", $data );
- die "wrong magic [$magic]" unless $magic eq "\0RES";
- die "ERROR" if $type == 19;
- $ret = substr($data,12,$len);
+ my ( $type, @data ) = $self->parse_packet($data);
+
+ if ( $type == $packet_type->{JOB_CREATED} ) {
+ push @{ $self->{_cb_queue} }, sub {
+ my ( $self,$data ) = @_;
+warn "# WORK_COMPLETE ",dump $data;
+ my ( $type, $handle, $out ) = $self->parse_packet($data);
+ die "not WORK_COMPLETE" unless $type == $packet_type->{WORK_COMPLETE};
+ $self->res( $out );
+ $self->stop;
+ };
+ } else {
+ $self->stop;
+ }
+
+ my $out = $#data == 0 ? $data[0] : [ @data ];
+ $self->res( $out );
+
};
+ $data .= "\0" if $data;
my $len = length($data);
- my $message = pack("a4NN", "\0REQ", $type, length $data ) . $data;
- warn "# >>>> ",dump($data);
+ my $message = pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data;
+ warn "# >>>> ",dump($message);
my $mqueue = $self->{_message_queue} ||= [];
my $cqueue = $self->{_cb_queue} ||= [];
-
push @$mqueue, $message;
push @$cqueue, $cb;
$self->connect unless $self->{_connection};
$self->_send_next_message;
- $self->ioloop->start;
+ $self->start;
- return $ret;
+ $self->res;
}
sub start {
my ($self) = @_;
-
+ warn "# start";
$self->ioloop->start;
return $self;
}
sub stop {
my ($self) = @_;
-
+ warn "# stop";
$self->ioloop->stop;
return $self;
}
if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
while (my $message = shift @{$self->{_message_queue}}) {
- warn "# write ",dump($message);
- $self->ioloop->write($c, $message);
+ warn "# write ",dump($message);
+ $self->ioloop->write($c, $message);
}
}
}
=head2 C<req>
- $gearman->req( $type, $data );
+ $gearman->req( $type, $data, ..., sub { # callback } );
=head2 C<error>