8c87379ca6aaf609e5f0fe791483a67d89c9d482
[MojoX-Gearman.git] / lib / MojoX / Gearman.pm
1 package MojoX::Gearman;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = 0.1;
7 use base 'Mojo::Base';
8
9 use Mojo::IOLoop;
10 use List::Util    ();
11 use Mojo::Util    ();
12 use Scalar::Util        ();
13 use Data::Dump qw(dump);
14 require Carp;
15
16 __PACKAGE__->attr(server   => '127.0.0.1:4730');
17 __PACKAGE__->attr(ioloop   => sub { Mojo::IOLoop->singleton });
18 __PACKAGE__->attr(error => undef);
19 __PACKAGE__->attr(timeout  => 300);
20 __PACKAGE__->attr(encoding => 'UTF-8');
21 __PACKAGE__->attr(
22         on_error => sub {
23                 sub {
24                         my $gearman = shift;
25                         warn "Gearman error: ", $gearman->error, "\n";
26                   }
27         }
28 );
29
30 sub DESTROY {
31         my $self = shift;
32
33         # Loop
34         return unless my $loop = $self->ioloop;
35
36         # Cleanup connection
37         $loop->drop($self->{_connection})
38           if $self->{_connection};
39 }
40
41 sub connect {
42         my $self = shift;
43
44         # drop old connection
45         if ($self->connected) {
46                 $self->ioloop->drop($self->{_connection});
47         }
48
49         $self->server =~ m{^([^:]+)(:(\d+))?};
50         my $address = $1;
51         my $port = $3 || 4730;
52
53         Scalar::Util::weaken $self;
54
55         # connect
56         $self->{_connecting} = 1;
57         $self->{_connection} = $self->ioloop->connect(
58                 {   address     => $address,
59                         port       => $port,
60                         on_connect => sub { $self->_on_connect(@_) },
61                         on_read => sub { $self->_on_read(@_) },
62                         on_error   => sub { $self->_on_error(@_) },
63                         on_hup   => sub { $self->_on_hup(@_) },
64                 }
65         );
66
67         return $self;
68 }
69
70 sub connected {
71         my $self = shift;
72
73         return $self->{_connection};
74 }
75
76 sub req {
77         my $self = shift;
78         my $type = shift;
79         my $data = join("\0", @_);
80
81         Mojo::Util::encode($self->encoding, $data) if $self->encoding;
82
83         my $ret;
84
85         my $cb = sub {
86                 my ( $self, $data ) = @_;
87                 $self->ioloop->stop;
88                 warn "# <<<< ",dump($data);
89                 my ($magic, $type, $len) = unpack( "a4NN", $data );
90                 die "wrong magic [$magic]" unless $magic eq "\0RES";
91                 $ret = substr($data,12,$len);
92         };
93
94         my $len = length($data);
95         my $message = pack("a4NN", "\0REQ", $type, length $data ) . $data;
96         warn "# >>>> ",dump($data);
97
98         my $mqueue = $self->{_message_queue} ||= [];
99         my $cqueue = $self->{_cb_queue}   ||= [];
100
101
102         push @$mqueue, $message;
103         push @$cqueue, $cb;
104
105         $self->connect unless $self->{_connection};
106         $self->_send_next_message;
107
108         $self->ioloop->start;
109
110         return $ret;
111 }
112
113 sub start {
114         my ($self) = @_;
115
116         $self->ioloop->start;
117         return $self;
118 }
119
120 sub stop {
121         my ($self) = @_;
122
123         $self->ioloop->stop;
124         return $self;
125 }
126
127 sub _send_next_message {
128         my ($self) = @_;
129
130         if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
131                 while (my $message = shift @{$self->{_message_queue}}) {
132                 warn "# write ",dump($message);
133                 $self->ioloop->write($c, $message);
134                 }
135         }
136 }
137
138 sub _on_connect {
139         my ($self, $ioloop, $id) = @_;
140         delete $self->{_connecting};
141
142         $ioloop->connection_timeout($id => $self->timeout);
143
144         $self->_send_next_message;
145 }
146
147 sub _on_error {
148         my ($self, $ioloop, $id, $error) = @_;
149
150         $self->error($error);
151         $self->_inform_queue;
152
153         $self->on_error->($self);
154
155         $ioloop->drop($id);
156 }
157
158 sub _on_hup {
159         my ($self, $ioloop, $id) = @_;
160
161         $self->{error} ||= 'disconnected';
162         $self->_inform_queue;
163
164         delete $self->{_message_queue};
165
166         delete $self->{_connecting};
167         delete $self->{_connection};
168 }
169
170 sub _inform_queue {
171         my ($self) = @_;
172
173         for my $cb (@{$self->{_cb_queue}}) {
174                 $cb->($self) if $cb;
175         }
176         $self->{_queue} = [];
177 }
178
179 sub _on_read {
180         my ($self, $ioloop, $id, $data) = @_;
181
182         my $cb = shift @{$self->{_cb_queue}};
183         if ($cb) {
184                 Mojo::Util::decode($self->encoding, $data) if $data;
185                 warn "# on read callback with ", dump($data);
186                 $cb->($self, $data);
187         } else {
188                 warn "no callback";
189         }
190
191         # Reset error after callback dispatching
192         $self->error(undef);
193 }
194
195 1;
196 __END__
197
198 =head1 NAME
199
200 MojoX::Gearman - asynchronous Gearman client for L<Mojolicious>.
201
202 =head1 SYNOPSIS
203
204         use MojoX::Gearman;
205
206         my $gearman = MojoX::Gearman->new(server => '127.0.0.1:4730');
207
208 =head1 DESCRIPTION
209
210 L<MojoX::Gearman> is an asynchronous client to Gearman for Mojo.
211
212 =head1 ATTRIBUTES
213
214 L<MojoX::Gearman> implements the following attributes.
215
216 =head2 C<server>
217
218         my $server = $gearman->server;
219         $gearman         = $gearman->server('127.0.0.1:4730');
220
221 C<Gearman> server connection string, defaults to '127.0.0.1:4730'.
222
223 =head2 C<ioloop>
224
225         my $ioloop = $gearman->ioloop;
226         $gearman         = $gearman->ioloop(Mojo::IOLoop->new);
227
228 Loop object to use for io operations, by default a L<Mojo::IOLoop> singleton
229 object will be used.
230
231 =head2 C<timeout>
232
233         my $timeout = $gearman->timeout;
234         $gearman          = $gearman->timeout(100);
235
236 Maximum amount of time in seconds a connection can be inactive before being
237 dropped, defaults to C<300>.
238
239 =head2 C<encoding>
240
241         my $encoding = $gearman->encoding;
242         $gearman           = $gearman->encoding('UTF-8');
243
244 Encoding used for stored data, defaults to C<UTF-8>.
245
246 =head1 METHODS
247
248 =head2 C<req>
249
250         $gearman->req( $type, $data );
251
252 =head2 C<error>
253
254         $gearman->execute("ping" => sub {
255                 my ($gearman, $result) = @_;
256                 die $gearman->error unless defined $result;
257         }
258
259 Returns error occured during command execution.
260 Note that this method returns error code just from current command and
261 can be used just in callback.
262
263 =head2 C<on_error>
264
265         $gearman->on_error(sub{
266                 my $gearman = shift;
267                 warn 'Gearman error ', $gearman->error, "\n";
268         });
269
270 Executes if error occured. Called before commands callbacks.
271
272 =head2 C<start>
273
274         $gearman->start;
275
276 Starts IOLoop. Shortcut for $gearman->ioloop->start;
277
278 =head1 SEE ALSO
279
280 L<Gearman::Client>, L<Mojolicious>, L<Mojo::IOLoop>
281
282 =head1 SUPPORT
283
284 =head1 DEVELOPMENT
285
286 =head2 Repository
287
288         https://github.com/dpavlin/mojox-gearman
289
290 =head1 AUTHOR
291
292 Dobrica Pavlinusic, C<dpavlin@rot13.org>.
293
294 =head1 COPYRIGHT AND LICENSE
295
296 Copyright (C) 2010, Dobrica Pavlinusic
297
298 This program is free software, you can gearmantribute it and/or modify it under
299 the terms of the Artistic License version 2.0.
300
301 =cut