X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=lib%2FMojoX%2FGearman.pm;h=341206e6e33ef84734d5133ef7baef94a3f34aed;hb=104aad3ef2f6678a66c366d1819bb04d8f4b11f1;hp=4448b1c7a5a477efbbeacb66074785fb7d000882;hpb=9405354b7db8a3fdf6047c80c1aa573df31884b3;p=MojoX-Gearman.git diff --git a/lib/MojoX/Gearman.pm b/lib/MojoX/Gearman.pm index 4448b1c..341206e 100644 --- a/lib/MojoX/Gearman.pm +++ b/lib/MojoX/Gearman.pm @@ -14,9 +14,9 @@ use Data::Dump qw(dump); require Carp; __PACKAGE__->attr(server => '127.0.0.1:4730'); -__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->singleton }); +__PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->new }); __PACKAGE__->attr(error => undef); -__PACKAGE__->attr(timeout => 300); +__PACKAGE__->attr(timeout => 5); __PACKAGE__->attr(encoding => 'UTF-8'); __PACKAGE__->attr( on_error => sub { @@ -26,6 +26,7 @@ __PACKAGE__->attr( } } ); +__PACKAGE__->attr(res => undef); sub DESTROY { my $self = shift; @@ -64,6 +65,8 @@ sub connect { } ); + warn "# using gearman server $address:$port\n"; + return $self; } @@ -73,54 +76,148 @@ sub connected { return $self->{_connection}; } +my $packet_type = { + TEXT => -1, # fake type for text protocol + + CAN_DO => 1, + + PRE_SLEEP => 4, + + NOOP => 6, + + SUBMIT_JOB => 7, + JOB_CREATED => 8, + + GRAB_JOB => 9, + NO_JOB => 10, + JOB_ASSIGN => 11, + + WORK_COMPLETE => 13, + WORK_FAIL => 14, + + ECHO_REQ => 16, + ECHO_RES => 17, + + ERROR => 19, +}; + +my $nr2type; +$nr2type->{ $packet_type->{$_} } = $_ foreach keys %$packet_type; + + +sub parse_packet { + my ($self,$data) = @_; + die "no data in packet" unless $data; + my ($magic, $type, $len) = unpack( "a4NN", $data ); + die "wrong magic [$magic]" unless $magic eq "\0RES"; + die "unsupported type [$type]" unless exists $nr2type->{$type}; + die "ERROR" if $type == $packet_type->{ERROR}; + return ( $type, split("\0", substr($data,12,$len)) ); +} + sub req { my $self = shift; +warn "XXX req ",dump(@_); my $type = shift; + my $callback = pop @_ if ref $_[$#_] eq 'CODE'; my $data = join("\0", @_); + die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type}; Mojo::Util::encode($self->encoding, $data) if $self->encoding; - my $ret; + $self->{_res} = undef; + my $response; my $cb = sub { my ( $self, $data ) = @_; - $self->ioloop->stop; - warn "# <<<< ",dump($data); - my ($magic, $type, $len) = unpack( "a4NN", $data ); - die "wrong magic [$magic]" unless $magic eq "\0RES"; - die "ERROR" if $type == 19; - $ret = substr($data,12,$len); + + if ( substr($data,0,1) ne "\x00" ) { + $self->res( $data ); + $self->stop; + return; + } + + my ( $type, $handle, @data ) = $self->parse_packet($data); + warn "# <<<< ", $nr2type->{$type}, " $handle ",dump(@data); + + if ( $type == $packet_type->{JOB_CREATED} ) { + push @{ $self->{_cb_queue} }, sub { + my ( $self,$data ) = @_; + my ( $type, $handle, $out ) = $self->parse_packet($data); +warn "# <<<< ",$nr2type->{$type}, " ",dump $data; + if ( $type == $packet_type->{WORK_COMPLETE} ) { + warn "WORK_COMPLETE $handle ", dump $out; + } elsif ( $type == $packet_type->{WORK_FAIL} ) { + warn "WORK_FAIL $handle ", dump $out; + } + $self->res( $out ); + $self->stop; + }; + $self->ioloop->timer( $self->timeout => sub { + my $self = shift; + warn "TIMEOUT $handle ", $self->timeout, "s for result"; + $self->stop; + }); + } elsif ( $type == $packet_type->{NO_JOB} ) { + $self->req( 'PRE_SLEEP' ); + $self->stop; + } elsif ( $type == $packet_type->{JOB_ASSIGN} ) { + my ( $function, $workload ) = @data; + my $callback = $self->{_function}->{$function}; + die "no $function callback" unless ref $callback eq 'CODE'; + my $out = $callback->( $workload ); + warn "## $data $callback = ", dump $out; + $self->req( 'WORK_COMPLETE', $handle, $out ); + $self->req( 'GRAB_JOB' ); + } elsif ( $type == $packet_type->{NOOP} ) { + $self->req( 'GRAB_JOB' ); + } else { + $self->stop; + } + + my $out = $#data == 0 ? $data[0] : [ @data ]; + $self->res( $out ); + }; +# $data .= "\0" if $data; my $len = length($data); - my $message = pack("a4NN", "\0REQ", $type, length $data ) . $data; - warn "# >>>> ",dump($data); + my $message = $type ne 'TEXT' + ? pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data + : "$data\r\n" + ; + warn "# >>>> $type ",dump($message); my $mqueue = $self->{_message_queue} ||= []; my $cqueue = $self->{_cb_queue} ||= []; - push @$mqueue, $message; push @$cqueue, $cb; $self->connect unless $self->{_connection}; $self->_send_next_message; - $self->ioloop->start; + if ( $type eq 'CAN_DO' ) { + $self->{_function}->{$data} = $callback; + $self->res( $callback ); + warn "# installed $data callback $callback"; + } elsif ( $type =~ m/^(ECHO_REQ|SUBMIT_JOB|GRAB_JOB|TEXT)$/ ) { # sync commands + $self->start; + } - return $ret; + $self->res; } sub start { my ($self) = @_; - + warn "# start"; $self->ioloop->start; return $self; } sub stop { my ($self) = @_; - + warn "# stop"; $self->ioloop->stop; return $self; } @@ -130,8 +227,8 @@ sub _send_next_message { if ((my $c = $self->{_connection}) && !$self->{_connecting}) { while (my $message = shift @{$self->{_message_queue}}) { - warn "# write ",dump($message); - $self->ioloop->write($c, $message); + warn "# write ",dump($message); + $self->ioloop->write($c, $message); } } } @@ -148,6 +245,8 @@ sub _on_connect { sub _on_error { my ($self, $ioloop, $id, $error) = @_; + warn "ERROR: $error"; + $self->error($error); $self->_inform_queue; @@ -180,15 +279,31 @@ sub _inform_queue { sub _on_read { my ($self, $ioloop, $id, $data) = @_; + warn "<<<< _on_read ",dump( $id, $data ); + + $data = $self->{message}->{$id} .= $data; + my ($magic, $type, $len) = unpack( "a4NN", $data ); + + if ( substr($data,0,1) ne "\x00" ) { + return if $data !~ s/\.[\n\r]$//s; + } elsif ( length $data < $len ) { + warn "# _on_read incomplete message"; + return; + } + + warn "#### data ",dump($data); + my $cb = shift @{$self->{_cb_queue}}; if ($cb) { - Mojo::Util::decode($self->encoding, $data) if $data; +# Mojo::Util::decode($self->encoding, $data) if $data; # FIXME warn "# on read callback with ", dump($data); $cb->($self, $data); } else { warn "no callback"; } + delete $self->{message}->{$id}; + # Reset error after callback dispatching $self->error(undef); } @@ -248,7 +363,7 @@ Encoding used for stored data, defaults to C. =head2 C - $gearman->req( $type, $data ); + $gearman->req( $type, $data, ..., sub { # callback } ); =head2 C