first working echo with Gearman
authorDobrica Pavlinusic <dpavlin@rot13.org>
Mon, 28 Feb 2011 17:11:11 +0000 (17:11 +0000)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Mon, 28 Feb 2011 17:13:22 +0000 (17:13 +0000)
Makefile.PL [new file with mode: 0644]
lib/MojoX/Gearman.pm [new file with mode: 0644]
t/gearman.t [new file with mode: 0755]

diff --git a/Makefile.PL b/Makefile.PL
new file mode 100644 (file)
index 0000000..89981f2
--- /dev/null
@@ -0,0 +1,14 @@
+use ExtUtils::MakeMaker;
+# See lib/ExtUtils/MakeMaker.pm for details of how to influence
+# the contents of the Makefile that is written.
+WriteMakefile(
+       NAME         => 'MojoX::Gearman',
+       VERSION_FROM => 'lib/MojoX/Gearman.pm', # finds $VERSION
+       PREREQ_PM    => {
+               'Test::Simple' => 0.44,
+               'Mojolicious'  => 0.9003,
+       },
+       AUTHOR       => 'Dobrica Pavlinusic <dpavlin@rot13.org>',
+       ABSTRACT_FROM => 'lib/MojoX/Gearman.pm',
+);
+
diff --git a/lib/MojoX/Gearman.pm b/lib/MojoX/Gearman.pm
new file mode 100644 (file)
index 0000000..6565e30
--- /dev/null
@@ -0,0 +1,300 @@
+package MojoX::Gearman;
+
+use strict;
+use warnings;
+
+our $VERSION = 0.1;
+use base 'Mojo::Base';
+
+use Mojo::IOLoop;
+use List::Util   ();
+use Mojo::Util   ();
+use Scalar::Util       ();
+use Data::Dump qw(dump);
+require Carp;
+
+__PACKAGE__->attr(server   => '127.0.0.1:4730');
+__PACKAGE__->attr(ioloop   => sub { Mojo::IOLoop->singleton });
+__PACKAGE__->attr(error        => undef);
+__PACKAGE__->attr(timeout  => 300);
+__PACKAGE__->attr(encoding => 'UTF-8');
+__PACKAGE__->attr(
+       on_error => sub {
+               sub {
+                       my $gearman = shift;
+                       warn "Gearman error: ", $gearman->error, "\n";
+                 }
+       }
+);
+
+sub DESTROY {
+       my $self = shift;
+
+       # Loop
+       return unless my $loop = $self->ioloop;
+
+       # Cleanup connection
+       $loop->drop($self->{_connection})
+         if $self->{_connection};
+}
+
+sub connect {
+       my $self = shift;
+
+       # drop old connection
+       if ($self->connected) {
+               $self->ioloop->drop($self->{_connection});
+       }
+
+       $self->server =~ m{^([^:]+)(:(\d+))?};
+       my $address = $1;
+       my $port = $3 || 4730;
+
+       Scalar::Util::weaken $self;
+
+       # connect
+       $self->{_connecting} = 1;
+       $self->{_connection} = $self->ioloop->connect(
+               {   address     => $address,
+                       port       => $port,
+                       on_connect => sub { $self->_on_connect(@_) },
+                       on_read => sub { $self->_on_read(@_) },
+                       on_error   => sub { $self->_on_error(@_) },
+                       on_hup   => sub { $self->_on_hup(@_) },
+               }
+       );
+
+       return $self;
+}
+
+sub connected {
+       my $self = shift;
+
+       return $self->{_connection};
+}
+
+sub req {
+       my ( $self, $type, $data ) = @_;
+
+       Mojo::Util::encode($self->encoding, $data) if $self->encoding;
+
+       my $ret;
+
+       my $cb = sub {
+               my ( $self, $data ) = @_;
+               $self->ioloop->stop;
+               warn "# <<<< ",dump($data);
+               my ($magic, $type, $len) = unpack( "a4NN", $data );
+               die "wrong magic [$magic]" unless $magic eq "\0RES";
+               $ret = $data;
+       };
+
+       $data .= "\x00";
+       my $len = length($data);
+       my $message = pack("a4NN", "\0REQ", 16, length $data ) . $data;
+       warn "# >>>> ",dump($data);
+
+       my $mqueue = $self->{_message_queue} ||= [];
+       my $cqueue = $self->{_cb_queue}   ||= [];
+
+
+       push @$mqueue, $message;
+       push @$cqueue, $cb;
+
+       $self->connect unless $self->{_connection};
+       $self->_send_next_message;
+
+       $self->ioloop->start;
+
+       return $ret;
+}
+
+sub start {
+       my ($self) = @_;
+
+       $self->ioloop->start;
+       return $self;
+}
+
+sub stop {
+       my ($self) = @_;
+
+       $self->ioloop->stop;
+       return $self;
+}
+
+sub _send_next_message {
+       my ($self) = @_;
+
+       if ((my $c = $self->{_connection}) && !$self->{_connecting}) {
+               while (my $message = shift @{$self->{_message_queue}}) {
+               warn "# write ",dump($message);
+               $self->ioloop->write($c, $message);
+               }
+       }
+}
+
+sub _on_connect {
+       my ($self, $ioloop, $id) = @_;
+       delete $self->{_connecting};
+
+       $ioloop->connection_timeout($id => $self->timeout);
+
+       $self->_send_next_message;
+}
+
+sub _on_error {
+       my ($self, $ioloop, $id, $error) = @_;
+
+       $self->error($error);
+       $self->_inform_queue;
+
+       $self->on_error->($self);
+
+       $ioloop->drop($id);
+}
+
+sub _on_hup {
+       my ($self, $ioloop, $id) = @_;
+
+       $self->{error} ||= 'disconnected';
+       $self->_inform_queue;
+
+       delete $self->{_message_queue};
+
+       delete $self->{_connecting};
+       delete $self->{_connection};
+}
+
+sub _inform_queue {
+       my ($self) = @_;
+
+       for my $cb (@{$self->{_cb_queue}}) {
+               $cb->($self) if $cb;
+       }
+       $self->{_queue} = [];
+}
+
+sub _on_read {
+       my ($self, $ioloop, $id, $data) = @_;
+
+       my $cb = shift @{$self->{_cb_queue}};
+       if ($cb) {
+               Mojo::Util::decode($self->encoding, $data) if $data;
+               warn "# on read callback with ", dump($data);
+               $cb->($self, $data);
+       } else {
+               warn "no callback";
+       }
+
+       # Reset error after callback dispatching
+       $self->error(undef);
+}
+
+1;
+__END__
+
+=head1 NAME
+
+MojoX::Gearman - asynchronous Gearman client for L<Mojolicious>.
+
+=head1 SYNOPSIS
+
+       use MojoX::Gearman;
+
+       my $gearman = MojoX::Gearman->new(server => '127.0.0.1:4730');
+
+=head1 DESCRIPTION
+
+L<MojoX::Gearman> is an asynchronous client to Gearman for Mojo.
+
+=head1 ATTRIBUTES
+
+L<MojoX::Gearman> implements the following attributes.
+
+=head2 C<server>
+
+       my $server = $gearman->server;
+       $gearman         = $gearman->server('127.0.0.1:4730');
+
+C<Gearman> server connection string, defaults to '127.0.0.1:4730'.
+
+=head2 C<ioloop>
+
+       my $ioloop = $gearman->ioloop;
+       $gearman         = $gearman->ioloop(Mojo::IOLoop->new);
+
+Loop object to use for io operations, by default a L<Mojo::IOLoop> singleton
+object will be used.
+
+=head2 C<timeout>
+
+       my $timeout = $gearman->timeout;
+       $gearman          = $gearman->timeout(100);
+
+Maximum amount of time in seconds a connection can be inactive before being
+dropped, defaults to C<300>.
+
+=head2 C<encoding>
+
+       my $encoding = $gearman->encoding;
+       $gearman           = $gearman->encoding('UTF-8');
+
+Encoding used for stored data, defaults to C<UTF-8>.
+
+=head1 METHODS
+
+=head2 C<req>
+
+       $gearman->req( $type, $data );
+
+=head2 C<error>
+
+       $gearman->execute("ping" => sub {
+               my ($gearman, $result) = @_;
+               die $gearman->error unless defined $result;
+       }
+
+Returns error occured during command execution.
+Note that this method returns error code just from current command and
+can be used just in callback.
+
+=head2 C<on_error>
+
+       $gearman->on_error(sub{
+               my $gearman = shift;
+               warn 'Gearman error ', $gearman->error, "\n";
+       });
+
+Executes if error occured. Called before commands callbacks.
+
+=head2 C<start>
+
+       $gearman->start;
+
+Starts IOLoop. Shortcut for $gearman->ioloop->start;
+
+=head1 SEE ALSO
+
+L<Gearman::Client>, L<Mojolicious>, L<Mojo::IOLoop>
+
+=head1 SUPPORT
+
+=head1 DEVELOPMENT
+
+=head2 Repository
+
+       https://github.com/dpavlin/mojox-gearman
+
+=head1 AUTHOR
+
+Dobrica Pavlinusic, C<dpavlin@rot13.org>.
+
+=head1 COPYRIGHT AND LICENSE
+
+Copyright (C) 2010, Dobrica Pavlinusic
+
+This program is free software, you can gearmantribute it and/or modify it under
+the terms of the Artistic License version 2.0.
+
+=cut
diff --git a/t/gearman.t b/t/gearman.t
new file mode 100755 (executable)
index 0000000..5ae8418
--- /dev/null
@@ -0,0 +1,12 @@
+#!/usr/bin/env perl
+use warnings;
+use strict;
+
+use Test::More tests => 3;
+use lib 'lib';
+
+use_ok 'MojoX::Gearman';
+
+my $g = new_ok 'MojoX::Gearman';
+
+ok( $g->req( 16, "foobar" ), 'echo' );