a7692fdc4e1ce8dc1aa92594e35c191f86218e5b
[MojoX-Gearman.git] / lib / MojoX / Gearman.pm
1 package MojoX::Gearman;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = 0.1;
7 use base 'Mojo::Base';
8
9 use Mojo::IOLoop;
10 use List::Util    ();
11 use Mojo::Util    ();
12 use Scalar::Util        ();
13 use Data::Dump qw(dump);
14 require Carp;
15
16 __PACKAGE__->attr(server   => '127.0.0.1:4730');
17 __PACKAGE__->attr(ioloop   => sub { Mojo::IOLoop->new });
18 __PACKAGE__->attr(error => undef);
19 __PACKAGE__->attr(timeout  => 300);
20 __PACKAGE__->attr(encoding => 'UTF-8');
21 __PACKAGE__->attr(
22         on_error => sub {
23                 sub {
24                         my $gearman = shift;
25                         warn "Gearman error: ", $gearman->error, "\n";
26                   }
27         }
28 );
29 __PACKAGE__->attr(res => undef);
30
31 sub DESTROY {
32         my $self = shift;
33
34         # Loop
35         return unless my $loop = $self->ioloop;
36
37         # Cleanup connection
38         $loop->drop($self->{_connection})
39           if $self->{_connection};
40 }
41
42 sub connect {
43         my $self = shift;
44
45         # drop old connection
46         if ($self->connected) {
47                 $self->ioloop->drop($self->{_connection});
48         }
49
50         $self->server =~ m{^([^:]+)(:(\d+))?};
51         my $address = $1;
52         my $port = $3 || 4730;
53
54         Scalar::Util::weaken $self;
55
56         # connect
57         $self->{_connecting} = 1;
58         $self->{_connection} = $self->ioloop->connect(
59                 {   address     => $address,
60                         port       => $port,
61                         on_connect => sub { $self->_on_connect(@_) },
62                         on_read => sub { $self->_on_read(@_) },
63                         on_error   => sub { $self->_on_error(@_) },
64                         on_hup   => sub { $self->_on_hup(@_) },
65                 }
66         );
67
68         return $self;
69 }
70
71 sub connected {
72         my $self = shift;
73
74         return $self->{_connection};
75 }
76
77 my $packet_type = {
78         SUBMIT_JOB => 7,
79         JOB_CREATED => 8,
80
81         WORK_COMPLETE => 13,
82
83         ECHO_REQ => 16,
84         ECHO_RES => 17,
85
86         ERROR => 19,
87 };
88
89 my $nr2type;
90 $nr2type->{ $packet_type->{$_} } = $_ foreach keys %$packet_type;
91
92
93 sub parse_packet {
94         my ($self,$data) = @_;
95         die "no data in packet" unless $data;
96         my ($magic, $type, $len) = unpack( "a4NN", $data );
97         die "wrong magic [$magic]" unless $magic eq "\0RES";
98         die "unsupported type [$type]" unless exists $nr2type->{$type};
99         die "ERROR" if $type == $packet_type->{ERROR};
100         return ( $type, split("\0", substr($data,12,$len)) );
101 }
102
103 sub req {
104         my $self = shift;
105         my $type = shift;
106         my $data = join("\0", @_);
107
108         die "can't find packet type $type in ", dump $packet_type unless exists $packet_type->{$type};
109         Mojo::Util::encode($self->encoding, $data) if $self->encoding;
110
111         $self->{_res} = undef;
112
113         my $response;
114         my $cb = sub {
115                 my ( $self, $data ) = @_;
116                 my ( $type, @data ) = $self->parse_packet($data);
117                 warn "# <<<< ", $nr2type->{$type}, " ",dump(@data);
118
119                 if ( $type == $packet_type->{JOB_CREATED} ) {
120                         push @{ $self->{_cb_queue} }, sub {
121                                 my ( $self,$data ) = @_;
122 warn "# WORK_COMPLETE ",dump $data;
123                                 my ( $type, $handle, $out ) = $self->parse_packet($data);
124                                 die "not WORK_COMPLETE" unless $type == $packet_type->{WORK_COMPLETE};
125                                 $self->res( $out );
126                                 $self->stop;
127                         };
128                 } else {
129                         $self->stop;
130                 }
131
132                 my $out = $#data == 0 ? $data[0] : [ @data ];
133                 $self->res( $out );
134
135         };
136
137 #       $data .= "\0" if $data;
138         my $len = length($data);
139         my $message = pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data;
140         warn "# >>>> $type ",dump($message);
141
142         my $mqueue = $self->{_message_queue} ||= [];
143         my $cqueue = $self->{_cb_queue}   ||= [];
144
145         push @$mqueue, $message;
146         push @$cqueue, $cb;
147
148         $self->connect unless $self->{_connection};
149         $self->_send_next_message;
150
151         $self->start;
152
153         $self->res;
154 }
155
156 sub start {
157         my ($self) = @_;
158         warn "# start";
159         $self->ioloop->start;
160         return $self;
161 }
162
163 sub stop {
164         my ($self) = @_;
165         warn "# stop";
166         $self->ioloop->stop;
167         return $self;
168 }
169
170 sub _send_next_message {
171         my ($self) = @_;
172
173         if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
174                 while (my $message = shift @{$self->{_message_queue}}) {
175                         warn "# write ",dump($message);
176                         $self->ioloop->write($c, $message);
177                 }
178         }
179 }
180
181 sub _on_connect {
182         my ($self, $ioloop, $id) = @_;
183         delete $self->{_connecting};
184
185         $ioloop->connection_timeout($id => $self->timeout);
186
187         $self->_send_next_message;
188 }
189
190 sub _on_error {
191         my ($self, $ioloop, $id, $error) = @_;
192
193         warn "ERROR: $error";
194
195         $self->error($error);
196         $self->_inform_queue;
197
198         $self->on_error->($self);
199
200         $ioloop->drop($id);
201 }
202
203 sub _on_hup {
204         my ($self, $ioloop, $id) = @_;
205
206         $self->{error} ||= 'disconnected';
207         $self->_inform_queue;
208
209         delete $self->{_message_queue};
210
211         delete $self->{_connecting};
212         delete $self->{_connection};
213 }
214
215 sub _inform_queue {
216         my ($self) = @_;
217
218         for my $cb (@{$self->{_cb_queue}}) {
219                 $cb->($self) if $cb;
220         }
221         $self->{_queue} = [];
222 }
223
224 sub _on_read {
225         my ($self, $ioloop, $id, $data) = @_;
226
227         my $cb = shift @{$self->{_cb_queue}};
228         if ($cb) {
229                 Mojo::Util::decode($self->encoding, $data) if $data;
230                 warn "# on read callback with ", dump($data);
231                 $cb->($self, $data);
232         } else {
233                 warn "no callback";
234         }
235
236         # Reset error after callback dispatching
237         $self->error(undef);
238 }
239
240 1;
241 __END__
242
243 =head1 NAME
244
245 MojoX::Gearman - asynchronous Gearman client for L<Mojolicious>.
246
247 =head1 SYNOPSIS
248
249         use MojoX::Gearman;
250
251         my $gearman = MojoX::Gearman->new(server => '127.0.0.1:4730');
252
253 =head1 DESCRIPTION
254
255 L<MojoX::Gearman> is an asynchronous client to Gearman for Mojo.
256
257 =head1 ATTRIBUTES
258
259 L<MojoX::Gearman> implements the following attributes.
260
261 =head2 C<server>
262
263         my $server = $gearman->server;
264         $gearman         = $gearman->server('127.0.0.1:4730');
265
266 C<Gearman> server connection string, defaults to '127.0.0.1:4730'.
267
268 =head2 C<ioloop>
269
270         my $ioloop = $gearman->ioloop;
271         $gearman         = $gearman->ioloop(Mojo::IOLoop->new);
272
273 Loop object to use for io operations, by default a L<Mojo::IOLoop> singleton
274 object will be used.
275
276 =head2 C<timeout>
277
278         my $timeout = $gearman->timeout;
279         $gearman          = $gearman->timeout(100);
280
281 Maximum amount of time in seconds a connection can be inactive before being
282 dropped, defaults to C<300>.
283
284 =head2 C<encoding>
285
286         my $encoding = $gearman->encoding;
287         $gearman           = $gearman->encoding('UTF-8');
288
289 Encoding used for stored data, defaults to C<UTF-8>.
290
291 =head1 METHODS
292
293 =head2 C<req>
294
295         $gearman->req( $type, $data, ..., sub { # callback } );
296
297 =head2 C<error>
298
299         $gearman->execute("ping" => sub {
300                 my ($gearman, $result) = @_;
301                 die $gearman->error unless defined $result;
302         }
303
304 Returns error occured during command execution.
305 Note that this method returns error code just from current command and
306 can be used just in callback.
307
308 =head2 C<on_error>
309
310         $gearman->on_error(sub{
311                 my $gearman = shift;
312                 warn 'Gearman error ', $gearman->error, "\n";
313         });
314
315 Executes if error occured. Called before commands callbacks.
316
317 =head2 C<start>
318
319         $gearman->start;
320
321 Starts IOLoop. Shortcut for $gearman->ioloop->start;
322
323 =head1 SEE ALSO
324
325 L<Gearman::Client>, L<Mojolicious>, L<Mojo::IOLoop>
326
327 =head1 SUPPORT
328
329 =head1 DEVELOPMENT
330
331 =head2 Repository
332
333         https://github.com/dpavlin/mojox-gearman
334
335 =head1 AUTHOR
336
337 Dobrica Pavlinusic, C<dpavlin@rot13.org>.
338
339 =head1 COPYRIGHT AND LICENSE
340
341 Copyright (C) 2010, Dobrica Pavlinusic
342
343 This program is free software, you can gearmantribute it and/or modify it under
344 the terms of the Artistic License version 2.0.
345
346 =cut