1 package MojoX::Gearman;
13 use Data::Dump qw(dump);
16 __PACKAGE__->attr(server => '127.0.0.1:4730');
17 __PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->new });
18 __PACKAGE__->attr(error => undef);
19 __PACKAGE__->attr(timeout => 300);
20 __PACKAGE__->attr(encoding => 'UTF-8');
25 warn "Gearman error: ", $gearman->error, "\n";
29 __PACKAGE__->attr(res => undef);
35 return unless my $loop = $self->ioloop;
38 $loop->drop($self->{_connection})
39 if $self->{_connection};
46 if ($self->connected) {
47 $self->ioloop->drop($self->{_connection});
50 $self->server =~ m{^([^:]+)(:(\d+))?};
52 my $port = $3 || 4730;
54 Scalar::Util::weaken $self;
57 $self->{_connecting} = 1;
58 $self->{_connection} = $self->ioloop->connect(
59 { address => $address,
61 on_connect => sub { $self->_on_connect(@_) },
62 on_read => sub { $self->_on_read(@_) },
63 on_error => sub { $self->_on_error(@_) },
64 on_hup => sub { $self->_on_hup(@_) },
68 warn "# using gearman server $address:$port\n";
76 return $self->{_connection};
80 TEXT => -1, # fake type for text protocol
105 $nr2type->{ $packet_type->{$_} } = $_ foreach keys %$packet_type;
109 my ($self,$data) = @_;
110 die "no data in packet" unless $data;
111 my ($magic, $type, $len) = unpack( "a4NN", $data );
112 die "wrong magic [$magic]" unless $magic eq "\0RES";
113 die "unsupported type [$type]" unless exists $nr2type->{$type};
114 die "ERROR" if $type == $packet_type->{ERROR};
115 return ( $type, split("\0", substr($data,12,$len)) );
120 warn "XXX req ",dump(@_);
122 my $callback = pop @_ if ref $_[$#_] eq 'CODE';
123 my $data = join("\0", @_);
125 die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
126 Mojo::Util::encode($self->encoding, $data) if $self->encoding;
128 $self->{_res} = undef;
132 my ( $self, $data ) = @_;
134 if ( substr($data,0,1) ne "\x00" ) {
140 my ( $type, @data ) = $self->parse_packet($data);
141 warn "# <<<< ", $nr2type->{$type}, " ",dump(@data);
143 if ( $type == $packet_type->{JOB_CREATED} ) {
144 push @{ $self->{_cb_queue} }, sub {
145 my ( $self,$data ) = @_;
146 my ( $type, $handle, $out ) = $self->parse_packet($data);
147 warn "# <<<< ",$nr2type->{$type}, " ",dump $data;
148 if ( $type == $packet_type->{WORK_COMPLETE} ) {
149 warn "WORK_COMPLETE $handle ", dump $out;
150 } elsif ( $type == $packet_type->{WORK_FAIL} ) {
151 warn "WORK_FAIL $handle ", dump $out;
156 } elsif ( $type == $packet_type->{NO_JOB} ) {
157 $self->req( 'PRE_SLEEP' );
159 } elsif ( $type == $packet_type->{JOB_ASSIGN} ) {
160 my ( $handle, $function, $workload ) = @data;
161 my $callback = $self->{_function}->{$function};
162 die "no $function callback" unless ref $callback eq 'CODE';
163 my $out = $callback->( $workload );
164 warn "## $data $callback = ", dump $out;
165 $self->req( 'WORK_COMPLETE', $handle, $out );
166 $self->req( 'GRAB_JOB' );
167 } elsif ( $type == $packet_type->{NOOP} ) {
168 $self->req( 'GRAB_JOB' );
173 my $out = $#data == 0 ? $data[0] : [ @data ];
178 # $data .= "\0" if $data;
179 my $len = length($data);
180 my $message = $type ne 'TEXT'
181 ? pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data
184 warn "# >>>> $type ",dump($message);
186 my $mqueue = $self->{_message_queue} ||= [];
187 my $cqueue = $self->{_cb_queue} ||= [];
189 push @$mqueue, $message;
192 $self->connect unless $self->{_connection};
193 $self->_send_next_message;
195 if ( $type eq 'CAN_DO' ) {
196 $self->{_function}->{$data} = $callback;
197 $self->res( $callback );
198 warn "# installed $data callback $callback";
199 } elsif ( $type =~ m/^(ECHO_REQ|SUBMIT_JOB|GRAB_JOB|TEXT)$/ ) { # sync commands
209 $self->ioloop->start;
220 sub _send_next_message {
223 if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
224 while (my $message = shift @{$self->{_message_queue}}) {
225 warn "# write ",dump($message);
226 $self->ioloop->write($c, $message);
232 my ($self, $ioloop, $id) = @_;
233 delete $self->{_connecting};
235 $ioloop->connection_timeout($id => $self->timeout);
237 $self->_send_next_message;
241 my ($self, $ioloop, $id, $error) = @_;
243 warn "ERROR: $error";
245 $self->error($error);
246 $self->_inform_queue;
248 $self->on_error->($self);
254 my ($self, $ioloop, $id) = @_;
256 $self->{error} ||= 'disconnected';
257 $self->_inform_queue;
259 delete $self->{_message_queue};
261 delete $self->{_connecting};
262 delete $self->{_connection};
268 for my $cb (@{$self->{_cb_queue}}) {
271 $self->{_queue} = [];
275 my ($self, $ioloop, $id, $data) = @_;
277 warn "<<<< _on_read ",dump( $id, $data );
279 $data = $self->{message}->{$id} .= $data;
280 my ($magic, $type, $len) = unpack( "a4NN", $data );
282 if ( substr($data,0,1) ne "\x00" ) {
283 return if $data !~ s/\.[\n\r]$//s;
284 } elsif ( length $data < $len ) {
285 warn "# _on_read incomplete message";
289 warn "#### data ",dump($data);
291 my $cb = shift @{$self->{_cb_queue}};
293 # Mojo::Util::decode($self->encoding, $data) if $data; # FIXME
294 warn "# on read callback with ", dump($data);
300 delete $self->{message}->{$id};
302 # Reset error after callback dispatching
311 MojoX::Gearman - asynchronous Gearman client for L<Mojolicious>.
317 my $gearman = MojoX::Gearman->new(server => '127.0.0.1:4730');
321 L<MojoX::Gearman> is an asynchronous client to Gearman for Mojo.
325 L<MojoX::Gearman> implements the following attributes.
329 my $server = $gearman->server;
330 $gearman = $gearman->server('127.0.0.1:4730');
332 C<Gearman> server connection string, defaults to '127.0.0.1:4730'.
336 my $ioloop = $gearman->ioloop;
337 $gearman = $gearman->ioloop(Mojo::IOLoop->new);
339 Loop object to use for io operations, by default a L<Mojo::IOLoop> singleton
344 my $timeout = $gearman->timeout;
345 $gearman = $gearman->timeout(100);
347 Maximum amount of time in seconds a connection can be inactive before being
348 dropped, defaults to C<300>.
352 my $encoding = $gearman->encoding;
353 $gearman = $gearman->encoding('UTF-8');
355 Encoding used for stored data, defaults to C<UTF-8>.
361 $gearman->req( $type, $data, ..., sub { # callback } );
365 $gearman->execute("ping" => sub {
366 my ($gearman, $result) = @_;
367 die $gearman->error unless defined $result;
370 Returns error occured during command execution.
371 Note that this method returns error code just from current command and
372 can be used just in callback.
376 $gearman->on_error(sub{
378 warn 'Gearman error ', $gearman->error, "\n";
381 Executes if error occured. Called before commands callbacks.
387 Starts IOLoop. Shortcut for $gearman->ioloop->start;
391 L<Gearman::Client>, L<Mojolicious>, L<Mojo::IOLoop>
399 https://github.com/dpavlin/mojox-gearman
403 Dobrica Pavlinusic, C<dpavlin@rot13.org>.
405 =head1 COPYRIGHT AND LICENSE
407 Copyright (C) 2010, Dobrica Pavlinusic
409 This program is free software, you can gearmantribute it and/or modify it under
410 the terms of the Artistic License version 2.0.