X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=lib%2FMojoX%2FGearman.pm;h=af521b8858d78d8cb39b811e91a7c77f2144235a;hb=b6a43b72bd4875bfc0c48c9a1f484a5fe56e2b9f;hp=4448b1c7a5a477efbbeacb66074785fb7d000882;hpb=9405354b7db8a3fdf6047c80c1aa573df31884b3;p=MojoX-Gearman.git diff --git a/lib/MojoX/Gearman.pm b/lib/MojoX/Gearman.pm index 4448b1c..af521b8 100644 --- a/lib/MojoX/Gearman.pm +++ b/lib/MojoX/Gearman.pm @@ -14,7 +14,7 @@ use Data::Dump qw(dump); require Carp; __PACKAGE__->attr(server => '127.0.0.1:4730'); -__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->singleton }); +__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->new }); __PACKAGE__->attr(error => undef); __PACKAGE__->attr(timeout => 300); __PACKAGE__->attr(encoding => 'UTF-8'); @@ -73,54 +73,103 @@ sub connected { return $self->{_connection}; } +my $packet_type = { + SUBMIT_JOB => 7, + JOB_CREATED => 8, + + WORK_COMPLETE => 13, + + ECHO_REQ => 16, + ECHO_RES => 17, + + ERROR => 19, +}; + +my $nr2type; +$nr2type->{ $packet_type->{$_} } = $_ foreach keys %$packet_type; + + +sub parse_packet { + my ($self,$data) = @_; + my ($magic, $type, $len) = unpack( "a4NN", $data ); + die "wrong magic [$magic]" unless $magic eq "\0RES"; + die "unsupported type [$type]" unless exists $nr2type->{$type}; + die "ERROR" if $type == $packet_type->{ERROR}; + return ( $type, split("\0", substr($data,12,$len)) ); +} + +sub res { + my ( $self, $value ) = @_; + if ( defined $value ) { + warn "# ++ res = ",dump $value; + $self->{_res} ||= $value; + } + return $self->{_res}; +} + sub req { my $self = shift; my $type = shift; my $data = join("\0", @_); + die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type}; Mojo::Util::encode($self->encoding, $data) if $self->encoding; - my $ret; + $self->{_res} = undef; + my $response; my $cb = sub { my ( $self, $data ) = @_; - $self->ioloop->stop; warn "# <<<< ",dump($data); - my ($magic, $type, $len) = unpack( "a4NN", $data ); - die "wrong magic [$magic]" unless $magic eq "\0RES"; - die "ERROR" if $type == 19; - $ret = substr($data,12,$len); + my ( $type, @data ) = $self->parse_packet($data); + + if ( $type == $packet_type->{JOB_CREATED} ) { + push @{ $self->{_cb_queue} }, sub { + my ( $self,$data ) = @_; +warn "# WORK_COMPLETE ",dump $data; + my ( $type, $handle, $out ) = $self->parse_packet($data); + die "not WORK_COMPLETE" unless $type == $packet_type->{WORK_COMPLETE}; + $self->res( $out ); + $self->stop; + }; + } else { + $self->stop; + } + + my $out = $#data == 0 ? $data[0] : [ @data ]; + $self->res( $out ); + }; + $data .= "\0" if $data; my $len = length($data); - my $message = pack("a4NN", "\0REQ", $type, length $data ) . $data; - warn "# >>>> ",dump($data); + my $message = pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data; + warn "# >>>> ",dump($message); my $mqueue = $self->{_message_queue} ||= []; my $cqueue = $self->{_cb_queue} ||= []; - push @$mqueue, $message; push @$cqueue, $cb; $self->connect unless $self->{_connection}; $self->_send_next_message; - $self->ioloop->start; + $self->start; - return $ret; + $self->res; } sub start { my ($self) = @_; - + warn "# start"; $self->ioloop->start; return $self; } sub stop { my ($self) = @_; - + warn "# stop"; $self->ioloop->stop; return $self; } @@ -130,8 +179,8 @@ sub _send_next_message { if ((my $c = $self->{_connection}) && !$self->{_connecting}) { while (my $message = shift @{$self->{_message_queue}}) { - warn "# write ",dump($message); - $self->ioloop->write($c, $message); + warn "# write ",dump($message); + $self->ioloop->write($c, $message); } } } @@ -248,7 +297,7 @@ Encoding used for stored data, defaults to C. =head2 C - $gearman->req( $type, $data ); + $gearman->req( $type, $data, ..., sub { # callback } ); =head2 C