pass request type name
[MojoX-Gearman.git] / lib / MojoX / Gearman.pm
1 package MojoX::Gearman;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = 0.1;
7 use base 'Mojo::Base';
8
9 use Mojo::IOLoop;
10 use List::Util    ();
11 use Mojo::Util    ();
12 use Scalar::Util        ();
13 use Data::Dump qw(dump);
14 require Carp;
15
16 __PACKAGE__->attr(server   => '127.0.0.1:4730');
17 __PACKAGE__->attr(ioloop   => sub { Mojo::IOLoop->singleton });
18 __PACKAGE__->attr(error => undef);
19 __PACKAGE__->attr(timeout  => 300);
20 __PACKAGE__->attr(encoding => 'UTF-8');
21 __PACKAGE__->attr(
22         on_error => sub {
23                 sub {
24                         my $gearman = shift;
25                         warn "Gearman error: ", $gearman->error, "\n";
26                   }
27         }
28 );
29
30 sub DESTROY {
31         my $self = shift;
32
33         # Loop
34         return unless my $loop = $self->ioloop;
35
36         # Cleanup connection
37         $loop->drop($self->{_connection})
38           if $self->{_connection};
39 }
40
41 sub connect {
42         my $self = shift;
43
44         # drop old connection
45         if ($self->connected) {
46                 $self->ioloop->drop($self->{_connection});
47         }
48
49         $self->server =~ m{^([^:]+)(:(\d+))?};
50         my $address = $1;
51         my $port = $3 || 4730;
52
53         Scalar::Util::weaken $self;
54
55         # connect
56         $self->{_connecting} = 1;
57         $self->{_connection} = $self->ioloop->connect(
58                 {   address     => $address,
59                         port       => $port,
60                         on_connect => sub { $self->_on_connect(@_) },
61                         on_read => sub { $self->_on_read(@_) },
62                         on_error   => sub { $self->_on_error(@_) },
63                         on_hup   => sub { $self->_on_hup(@_) },
64                 }
65         );
66
67         return $self;
68 }
69
70 sub connected {
71         my $self = shift;
72
73         return $self->{_connection};
74 }
75
76 my $packet_type = {
77         SUBMIT_JOB => 7,
78         JOB_CREATED => 8,
79
80         WORK_COMPLETE => 13,
81
82         ECHO_REQ => 16,
83         ECHO_RES => 17,
84
85         ERROR => 19,
86 };
87
88 my $nr2type;
89 $nr2type->{ $packet_type->{$_} } = $_ foreach keys %$packet_type;
90
91
92 sub parse_packet {
93         my ($self,$data) = @_;
94         my ($magic, $type, $len) = unpack( "a4NN", $data );
95         die "wrong magic [$magic]" unless $magic eq "\0RES";
96         die "unsupported type [$type]" unless exists $nr2type->{$type};
97         die "ERROR" if $type == $packet_type->{ERROR};
98         return ( $type, split("\0", substr($data,12,$len)) );
99 }
100
101 sub res {
102         my ( $self, $value ) = @_;
103         if ( defined $value ) {
104                 warn "# ++ res = ",dump $value;
105                 $self->{_res} ||= $value;
106         }
107         return $self->{_res};
108 }
109
110 sub req {
111         my $self = shift;
112         my $type = shift;
113         my $data = join("\0", @_);
114
115         die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
116         Mojo::Util::encode($self->encoding, $data) if $self->encoding;
117
118         $self->{_res} = undef;
119
120         my $response;
121         my $cb = sub {
122                 my ( $self, $data ) = @_;
123                 $self->stop;
124                 warn "# <<<< ",dump($data);
125                 my ( $type, @data ) = $self->parse_packet($data);
126
127                 if ( $type == $packet_type->{JOB_CREATED} ) {
128                         push @{ $self->{_cb_queue} }, sub {
129                                 my ( $self,$data ) = @_;
130 warn "# WORK_COMPLETE ",dump $data;
131                                 my ( $type, $handle, $out ) = $self->parse_packet($data);
132                                 die "not WORK_COMPLETE" unless $type == $packet_type->{WORK_COMPLETE};
133                                 $self->res( $out );
134                                 $self->stop;
135                         };
136                         $self->start; # FIXME sync client
137                 }
138
139                 my $out = $#data == 0 ? $data[0] : [ @data ];
140                 $self->res( $out );
141
142         };
143
144         $data .= "\0" if $data;
145         my $len = length($data);
146         my $message = pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data;
147         warn "# >>>> ",dump($message);
148
149         my $mqueue = $self->{_message_queue} ||= [];
150         my $cqueue = $self->{_cb_queue}   ||= [];
151
152         push @$mqueue, $message;
153         push @$cqueue, $cb;
154
155         $self->connect unless $self->{_connection};
156         $self->_send_next_message;
157
158         $self->ioloop->start;
159
160         $self->res;
161 }
162
163 sub start {
164         my ($self) = @_;
165
166         $self->ioloop->start;
167         return $self;
168 }
169
170 sub stop {
171         my ($self) = @_;
172
173         $self->ioloop->stop;
174         return $self;
175 }
176
177 sub _send_next_message {
178         my ($self) = @_;
179
180         if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
181                 while (my $message = shift @{$self->{_message_queue}}) {
182                         warn "# write ",dump($message);
183                         $self->ioloop->write($c, $message);
184                 }
185         }
186 }
187
188 sub _on_connect {
189         my ($self, $ioloop, $id) = @_;
190         delete $self->{_connecting};
191
192         $ioloop->connection_timeout($id => $self->timeout);
193
194         $self->_send_next_message;
195 }
196
197 sub _on_error {
198         my ($self, $ioloop, $id, $error) = @_;
199
200         $self->error($error);
201         $self->_inform_queue;
202
203         $self->on_error->($self);
204
205         $ioloop->drop($id);
206 }
207
208 sub _on_hup {
209         my ($self, $ioloop, $id) = @_;
210
211         $self->{error} ||= 'disconnected';
212         $self->_inform_queue;
213
214         delete $self->{_message_queue};
215
216         delete $self->{_connecting};
217         delete $self->{_connection};
218 }
219
220 sub _inform_queue {
221         my ($self) = @_;
222
223         for my $cb (@{$self->{_cb_queue}}) {
224                 $cb->($self) if $cb;
225         }
226         $self->{_queue} = [];
227 }
228
229 sub _on_read {
230         my ($self, $ioloop, $id, $data) = @_;
231
232         my $cb = shift @{$self->{_cb_queue}};
233         if ($cb) {
234                 Mojo::Util::decode($self->encoding, $data) if $data;
235                 warn "# on read callback with ", dump($data);
236                 $cb->($self, $data);
237         } else {
238                 warn "no callback";
239         }
240
241         # Reset error after callback dispatching
242         $self->error(undef);
243 }
244
245 1;
246 __END__
247
248 =head1 NAME
249
250 MojoX::Gearman - asynchronous Gearman client for L<Mojolicious>.
251
252 =head1 SYNOPSIS
253
254         use MojoX::Gearman;
255
256         my $gearman = MojoX::Gearman->new(server => '127.0.0.1:4730');
257
258 =head1 DESCRIPTION
259
260 L<MojoX::Gearman> is an asynchronous client to Gearman for Mojo.
261
262 =head1 ATTRIBUTES
263
264 L<MojoX::Gearman> implements the following attributes.
265
266 =head2 C<server>
267
268         my $server = $gearman->server;
269         $gearman         = $gearman->server('127.0.0.1:4730');
270
271 C<Gearman> server connection string, defaults to '127.0.0.1:4730'.
272
273 =head2 C<ioloop>
274
275         my $ioloop = $gearman->ioloop;
276         $gearman         = $gearman->ioloop(Mojo::IOLoop->new);
277
278 Loop object to use for io operations, by default a L<Mojo::IOLoop> singleton
279 object will be used.
280
281 =head2 C<timeout>
282
283         my $timeout = $gearman->timeout;
284         $gearman          = $gearman->timeout(100);
285
286 Maximum amount of time in seconds a connection can be inactive before being
287 dropped, defaults to C<300>.
288
289 =head2 C<encoding>
290
291         my $encoding = $gearman->encoding;
292         $gearman           = $gearman->encoding('UTF-8');
293
294 Encoding used for stored data, defaults to C<UTF-8>.
295
296 =head1 METHODS
297
298 =head2 C<req>
299
300         $gearman->req( $type, $data, ..., sub { # callback } );
301
302 =head2 C<error>
303
304         $gearman->execute("ping" => sub {
305                 my ($gearman, $result) = @_;
306                 die $gearman->error unless defined $result;
307         }
308
309 Returns error occured during command execution.
310 Note that this method returns error code just from current command and
311 can be used just in callback.
312
313 =head2 C<on_error>
314
315         $gearman->on_error(sub{
316                 my $gearman = shift;
317                 warn 'Gearman error ', $gearman->error, "\n";
318         });
319
320 Executes if error occured. Called before commands callbacks.
321
322 =head2 C<start>
323
324         $gearman->start;
325
326 Starts IOLoop. Shortcut for $gearman->ioloop->start;
327
328 =head1 SEE ALSO
329
330 L<Gearman::Client>, L<Mojolicious>, L<Mojo::IOLoop>
331
332 =head1 SUPPORT
333
334 =head1 DEVELOPMENT
335
336 =head2 Repository
337
338         https://github.com/dpavlin/mojox-gearman
339
340 =head1 AUTHOR
341
342 Dobrica Pavlinusic, C<dpavlin@rot13.org>.
343
344 =head1 COPYRIGHT AND LICENSE
345
346 Copyright (C) 2010, Dobrica Pavlinusic
347
348 This program is free software, you can gearmantribute it and/or modify it under
349 the terms of the Artistic License version 2.0.
350
351 =cut