added timeout for JOB_CREATED
[MojoX-Gearman.git] / lib / MojoX / Gearman.pm
1 package MojoX::Gearman;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = 0.1;
7 use base 'Mojo::Base';
8
9 use Mojo::IOLoop;
10 use List::Util    ();
11 use Mojo::Util    ();
12 use Scalar::Util        ();
13 use Data::Dump qw(dump);
14 require Carp;
15
16 __PACKAGE__->attr(server   => '127.0.0.1:4730');
17 __PACKAGE__->attr(ioloop   => sub { Mojo::IOLoop->new });
18 __PACKAGE__->attr(error => undef);
19 __PACKAGE__->attr(timeout  => 5);
20 __PACKAGE__->attr(encoding => 'UTF-8');
21 __PACKAGE__->attr(
22         on_error => sub {
23                 sub {
24                         my $gearman = shift;
25                         warn "Gearman error: ", $gearman->error, "\n";
26                   }
27         }
28 );
29 __PACKAGE__->attr(res => undef);
30
31 sub DESTROY {
32         my $self = shift;
33
34         # Loop
35         return unless my $loop = $self->ioloop;
36
37         # Cleanup connection
38         $loop->drop($self->{_connection})
39           if $self->{_connection};
40 }
41
42 sub connect {
43         my $self = shift;
44
45         # drop old connection
46         if ($self->connected) {
47                 $self->ioloop->drop($self->{_connection});
48         }
49
50         $self->server =~ m{^([^:]+)(:(\d+))?};
51         my $address = $1;
52         my $port = $3 || 4730;
53
54         Scalar::Util::weaken $self;
55
56         # connect
57         $self->{_connecting} = 1;
58         $self->{_connection} = $self->ioloop->connect(
59                 {   address     => $address,
60                         port       => $port,
61                         on_connect => sub { $self->_on_connect(@_) },
62                         on_read => sub { $self->_on_read(@_) },
63                         on_error   => sub { $self->_on_error(@_) },
64                         on_hup   => sub { $self->_on_hup(@_) },
65                 }
66         );
67
68         warn "# using gearman server $address:$port\n";
69
70         return $self;
71 }
72
73 sub connected {
74         my $self = shift;
75
76         return $self->{_connection};
77 }
78
79 my $packet_type = {
80         TEXT => -1,     # fake type for text protocol
81
82         CAN_DO => 1,
83
84         PRE_SLEEP => 4,
85
86         NOOP => 6,
87
88         SUBMIT_JOB => 7,
89         JOB_CREATED => 8,
90
91         GRAB_JOB => 9,
92         NO_JOB => 10,
93         JOB_ASSIGN => 11,
94
95         WORK_COMPLETE => 13,
96         WORK_FAIL => 14,
97
98         ECHO_REQ => 16,
99         ECHO_RES => 17,
100
101         ERROR => 19,
102 };
103
104 my $nr2type;
105 $nr2type->{ $packet_type->{$_} } = $_ foreach keys %$packet_type;
106
107
108 sub parse_packet {
109         my ($self,$data) = @_;
110         die "no data in packet" unless $data;
111         my ($magic, $type, $len) = unpack( "a4NN", $data );
112         die "wrong magic [$magic]" unless $magic eq "\0RES";
113         die "unsupported type [$type]" unless exists $nr2type->{$type};
114         die "ERROR" if $type == $packet_type->{ERROR};
115         return ( $type, split("\0", substr($data,12,$len)) );
116 }
117
118 sub req {
119         my $self = shift;
120 warn "XXX req ",dump(@_);
121         my $type = shift;
122         my $callback = pop @_ if ref $_[$#_] eq 'CODE';
123         my $data = join("\0", @_);
124
125         die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
126         Mojo::Util::encode($self->encoding, $data) if $self->encoding;
127
128         $self->{_res} = undef;
129
130         my $response;
131         my $cb = sub {
132                 my ( $self, $data ) = @_;
133
134                 if ( substr($data,0,1) ne "\x00" ) {
135                         $self->res( $data );
136                         $self->stop;
137                         return;
138                 }
139
140                 my ( $type, $handle, @data ) = $self->parse_packet($data);
141                 warn "# <<<< ", $nr2type->{$type}, " $handle ",dump(@data);
142
143                 if ( $type == $packet_type->{JOB_CREATED} ) {
144                         push @{ $self->{_cb_queue} }, sub {
145                                 my ( $self,$data ) = @_;
146                                 my ( $type, $handle, $out ) = $self->parse_packet($data);
147 warn "# <<<< ",$nr2type->{$type}, " ",dump $data;
148                                 if ( $type == $packet_type->{WORK_COMPLETE} ) {
149                                         warn "WORK_COMPLETE $handle ", dump $out;
150                                 } elsif ( $type == $packet_type->{WORK_FAIL} ) {
151                                         warn "WORK_FAIL $handle ", dump $out;
152                                 }
153                                 $self->res( $out );
154                                 $self->stop;
155                         };
156                         $self->ioloop->timer( $self->timeout => sub {
157                                 my $self = shift;
158                                 warn "TIMEOUT $handle ", $self->timeout, "s for result";
159                                 $self->stop;
160                         });
161                 } elsif ( $type == $packet_type->{NO_JOB} ) {
162                         $self->req( 'PRE_SLEEP' );
163                         $self->stop;
164                 } elsif ( $type == $packet_type->{JOB_ASSIGN} ) {
165                         my ( $function, $workload ) = @data;
166                         my $callback = $self->{_function}->{$function};
167                         die "no $function callback" unless ref $callback eq 'CODE';
168                         my $out = $callback->( $workload );
169                         warn "## $data $callback = ", dump $out;
170                         $self->req( 'WORK_COMPLETE', $handle, $out );
171                         $self->req( 'GRAB_JOB' );
172                 } elsif ( $type == $packet_type->{NOOP} ) {
173                         $self->req( 'GRAB_JOB' );
174                 } else {
175                         $self->stop;
176                 }
177
178                 my $out = $#data == 0 ? $data[0] : [ @data ];
179                 $self->res( $out );
180
181         };
182
183 #       $data .= "\0" if $data;
184         my $len = length($data);
185         my $message = $type ne 'TEXT'
186                 ? pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data
187                 : "$data\r\n"
188         ;
189         warn "# >>>> $type ",dump($message);
190
191         my $mqueue = $self->{_message_queue} ||= [];
192         my $cqueue = $self->{_cb_queue}   ||= [];
193
194         push @$mqueue, $message;
195         push @$cqueue, $cb;
196
197         $self->connect unless $self->{_connection};
198         $self->_send_next_message;
199
200         if ( $type eq 'CAN_DO' ) {
201                 $self->{_function}->{$data} = $callback;
202                 $self->res( $callback );
203                 warn "# installed $data callback $callback";
204         } elsif ( $type =~ m/^(ECHO_REQ|SUBMIT_JOB|GRAB_JOB|TEXT)$/ ) { # sync commands
205                 $self->start;
206         }
207
208         $self->res;
209 }
210
211 sub start {
212         my ($self) = @_;
213         warn "# start";
214         $self->ioloop->start;
215         return $self;
216 }
217
218 sub stop {
219         my ($self) = @_;
220         warn "# stop";
221         $self->ioloop->stop;
222         return $self;
223 }
224
225 sub _send_next_message {
226         my ($self) = @_;
227
228         if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
229                 while (my $message = shift @{$self->{_message_queue}}) {
230                         warn "# write ",dump($message);
231                         $self->ioloop->write($c, $message);
232                 }
233         }
234 }
235
236 sub _on_connect {
237         my ($self, $ioloop, $id) = @_;
238         delete $self->{_connecting};
239
240         $ioloop->connection_timeout($id => $self->timeout);
241
242         $self->_send_next_message;
243 }
244
245 sub _on_error {
246         my ($self, $ioloop, $id, $error) = @_;
247
248         warn "ERROR: $error";
249
250         $self->error($error);
251         $self->_inform_queue;
252
253         $self->on_error->($self);
254
255         $ioloop->drop($id);
256 }
257
258 sub _on_hup {
259         my ($self, $ioloop, $id) = @_;
260
261         $self->{error} ||= 'disconnected';
262         $self->_inform_queue;
263
264         delete $self->{_message_queue};
265
266         delete $self->{_connecting};
267         delete $self->{_connection};
268 }
269
270 sub _inform_queue {
271         my ($self) = @_;
272
273         for my $cb (@{$self->{_cb_queue}}) {
274                 $cb->($self) if $cb;
275         }
276         $self->{_queue} = [];
277 }
278
279 sub _on_read {
280         my ($self, $ioloop, $id, $data) = @_;
281
282         warn "<<<< _on_read ",dump( $id, $data );
283
284         $data = $self->{message}->{$id} .= $data;
285         my ($magic, $type, $len) = unpack( "a4NN", $data );
286
287         if ( substr($data,0,1) ne "\x00" ) {
288                 return if $data !~ s/\.[\n\r]$//s;
289         } elsif ( length $data < $len ) {
290                 warn "# _on_read incomplete message";
291                 return;
292         }
293
294         warn "#### data ",dump($data);
295
296         my $cb = shift @{$self->{_cb_queue}};
297         if ($cb) {
298 #               Mojo::Util::decode($self->encoding, $data) if $data; # FIXME
299                 warn "# on read callback with ", dump($data);
300                 $cb->($self, $data);
301         } else {
302                 warn "no callback";
303         }
304
305         delete $self->{message}->{$id};
306
307         # Reset error after callback dispatching
308         $self->error(undef);
309 }
310
311 1;
312 __END__
313
314 =head1 NAME
315
316 MojoX::Gearman - asynchronous Gearman client for L<Mojolicious>.
317
318 =head1 SYNOPSIS
319
320         use MojoX::Gearman;
321
322         my $gearman = MojoX::Gearman->new(server => '127.0.0.1:4730');
323
324 =head1 DESCRIPTION
325
326 L<MojoX::Gearman> is an asynchronous client to Gearman for Mojo.
327
328 =head1 ATTRIBUTES
329
330 L<MojoX::Gearman> implements the following attributes.
331
332 =head2 C<server>
333
334         my $server = $gearman->server;
335         $gearman         = $gearman->server('127.0.0.1:4730');
336
337 C<Gearman> server connection string, defaults to '127.0.0.1:4730'.
338
339 =head2 C<ioloop>
340
341         my $ioloop = $gearman->ioloop;
342         $gearman         = $gearman->ioloop(Mojo::IOLoop->new);
343
344 Loop object to use for io operations, by default a L<Mojo::IOLoop> singleton
345 object will be used.
346
347 =head2 C<timeout>
348
349         my $timeout = $gearman->timeout;
350         $gearman          = $gearman->timeout(100);
351
352 Maximum amount of time in seconds a connection can be inactive before being
353 dropped, defaults to C<300>.
354
355 =head2 C<encoding>
356
357         my $encoding = $gearman->encoding;
358         $gearman           = $gearman->encoding('UTF-8');
359
360 Encoding used for stored data, defaults to C<UTF-8>.
361
362 =head1 METHODS
363
364 =head2 C<req>
365
366         $gearman->req( $type, $data, ..., sub { # callback } );
367
368 =head2 C<error>
369
370         $gearman->execute("ping" => sub {
371                 my ($gearman, $result) = @_;
372                 die $gearman->error unless defined $result;
373         }
374
375 Returns error occured during command execution.
376 Note that this method returns error code just from current command and
377 can be used just in callback.
378
379 =head2 C<on_error>
380
381         $gearman->on_error(sub{
382                 my $gearman = shift;
383                 warn 'Gearman error ', $gearman->error, "\n";
384         });
385
386 Executes if error occured. Called before commands callbacks.
387
388 =head2 C<start>
389
390         $gearman->start;
391
392 Starts IOLoop. Shortcut for $gearman->ioloop->start;
393
394 =head1 SEE ALSO
395
396 L<Gearman::Client>, L<Mojolicious>, L<Mojo::IOLoop>
397
398 =head1 SUPPORT
399
400 =head1 DEVELOPMENT
401
402 =head2 Repository
403
404         https://github.com/dpavlin/mojox-gearman
405
406 =head1 AUTHOR
407
408 Dobrica Pavlinusic, C<dpavlin@rot13.org>.
409
410 =head1 COPYRIGHT AND LICENSE
411
412 Copyright (C) 2010, Dobrica Pavlinusic
413
414 This program is free software, you can gearmantribute it and/or modify it under
415 the terms of the Artistic License version 2.0.
416
417 =cut