1 package MojoX::Gearman;
13 use Data::Dump qw(dump);
16 __PACKAGE__->attr(server => '127.0.0.1:4730');
17 __PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->singleton });
18 __PACKAGE__->attr(error => undef);
19 __PACKAGE__->attr(timeout => 300);
20 __PACKAGE__->attr(encoding => 'UTF-8');
25 warn "Gearman error: ", $gearman->error, "\n";
34 return unless my $loop = $self->ioloop;
37 $loop->drop($self->{_connection})
38 if $self->{_connection};
45 if ($self->connected) {
46 $self->ioloop->drop($self->{_connection});
49 $self->server =~ m{^([^:]+)(:(\d+))?};
51 my $port = $3 || 4730;
53 Scalar::Util::weaken $self;
56 $self->{_connecting} = 1;
57 $self->{_connection} = $self->ioloop->connect(
58 { address => $address,
60 on_connect => sub { $self->_on_connect(@_) },
61 on_read => sub { $self->_on_read(@_) },
62 on_error => sub { $self->_on_error(@_) },
63 on_hup => sub { $self->_on_hup(@_) },
73 return $self->{_connection};
79 my $data = join("\0", @_);
81 Mojo::Util::encode($self->encoding, $data) if $self->encoding;
86 my ( $self, $data ) = @_;
88 warn "# <<<< ",dump($data);
89 my ($magic, $type, $len) = unpack( "a4NN", $data );
90 die "wrong magic [$magic]" unless $magic eq "\0RES";
91 die "ERROR" if $type == 19;
92 $ret = substr($data,12,$len);
95 my $len = length($data);
96 my $message = pack("a4NN", "\0REQ", $type, length $data ) . $data;
97 warn "# >>>> ",dump($data);
99 my $mqueue = $self->{_message_queue} ||= [];
100 my $cqueue = $self->{_cb_queue} ||= [];
103 push @$mqueue, $message;
106 $self->connect unless $self->{_connection};
107 $self->_send_next_message;
109 $self->ioloop->start;
117 $self->ioloop->start;
128 sub _send_next_message {
131 if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
132 while (my $message = shift @{$self->{_message_queue}}) {
133 warn "# write ",dump($message);
134 $self->ioloop->write($c, $message);
140 my ($self, $ioloop, $id) = @_;
141 delete $self->{_connecting};
143 $ioloop->connection_timeout($id => $self->timeout);
145 $self->_send_next_message;
149 my ($self, $ioloop, $id, $error) = @_;
151 $self->error($error);
152 $self->_inform_queue;
154 $self->on_error->($self);
160 my ($self, $ioloop, $id) = @_;
162 $self->{error} ||= 'disconnected';
163 $self->_inform_queue;
165 delete $self->{_message_queue};
167 delete $self->{_connecting};
168 delete $self->{_connection};
174 for my $cb (@{$self->{_cb_queue}}) {
177 $self->{_queue} = [];
181 my ($self, $ioloop, $id, $data) = @_;
183 my $cb = shift @{$self->{_cb_queue}};
185 Mojo::Util::decode($self->encoding, $data) if $data;
186 warn "# on read callback with ", dump($data);
192 # Reset error after callback dispatching
201 MojoX::Gearman - asynchronous Gearman client for L<Mojolicious>.
207 my $gearman = MojoX::Gearman->new(server => '127.0.0.1:4730');
211 L<MojoX::Gearman> is an asynchronous client to Gearman for Mojo.
215 L<MojoX::Gearman> implements the following attributes.
219 my $server = $gearman->server;
220 $gearman = $gearman->server('127.0.0.1:4730');
222 C<Gearman> server connection string, defaults to '127.0.0.1:4730'.
226 my $ioloop = $gearman->ioloop;
227 $gearman = $gearman->ioloop(Mojo::IOLoop->new);
229 Loop object to use for io operations, by default a L<Mojo::IOLoop> singleton
234 my $timeout = $gearman->timeout;
235 $gearman = $gearman->timeout(100);
237 Maximum amount of time in seconds a connection can be inactive before being
238 dropped, defaults to C<300>.
242 my $encoding = $gearman->encoding;
243 $gearman = $gearman->encoding('UTF-8');
245 Encoding used for stored data, defaults to C<UTF-8>.
251 $gearman->req( $type, $data );
255 $gearman->execute("ping" => sub {
256 my ($gearman, $result) = @_;
257 die $gearman->error unless defined $result;
260 Returns error occured during command execution.
261 Note that this method returns error code just from current command and
262 can be used just in callback.
266 $gearman->on_error(sub{
268 warn 'Gearman error ', $gearman->error, "\n";
271 Executes if error occured. Called before commands callbacks.
277 Starts IOLoop. Shortcut for $gearman->ioloop->start;
281 L<Gearman::Client>, L<Mojolicious>, L<Mojo::IOLoop>
289 https://github.com/dpavlin/mojox-gearman
293 Dobrica Pavlinusic, C<dpavlin@rot13.org>.
295 =head1 COPYRIGHT AND LICENSE
297 Copyright (C) 2010, Dobrica Pavlinusic
299 This program is free software, you can gearmantribute it and/or modify it under
300 the terms of the Artistic License version 2.0.