replacement for APKPM::Poll
authorDobrica Pavlinusic <dpavlin@rot13.org>
Tue, 5 Jul 2011 12:42:44 +0000 (14:42 +0200)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Tue, 5 Jul 2011 12:42:44 +0000 (14:42 +0200)
renamed to CPE since it *is* CPE poller, and now with support for
polling from Davolink and EasyGateway

lib/APKPM/CPE.pm [new file with mode: 0644]
t/CPE.t [new file with mode: 0755]

diff --git a/lib/APKPM/CPE.pm b/lib/APKPM/CPE.pm
new file mode 100644 (file)
index 0000000..b1100d5
--- /dev/null
@@ -0,0 +1,118 @@
+package APKPM::CPE;
+
+use base qw(Gearman::Driver::Worker);
+use Moose;
+use Time::HiRes qw(time);
+use Data::Dump qw(dump);
+use Redis;
+use Net::Ping;
+
+with 'APKPM::Gearman::Client';
+with 'APKPM::Gearman';
+
+sub prefix { 'CPE_' }
+
+sub poll : Job : Encode(e_json) {
+       my ( $self, $job, $workload ) = @_;
+
+       my $redis = Redis->new;
+       $redis->del( $_ ) foreach $redis->keys('CPE.*');
+
+       my $start = $self->datetime_now;
+
+       $redis->set( 'CPE.start' => $start );
+
+       my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
+
+       my $taskset = $self->gc->new_task_set;
+       my $results;
+
+       foreach my $entry ( @$entries ) {
+
+               my $username = $entry->{cn};
+
+               if ( $entry->{dhcpStatements} !~ m/fixed-address\s+(\S+)/ ) {
+                       $redis->sadd('CPE.error.no-ip' => $username);
+                       next;
+               }
+               my $ip = $1;
+
+               if ( $entry->{dhcpOption} !~ m/vendor-class-identifier\s\"([^"]+)\"/ ) {
+                       $redis->sadd('CPE.error.vendor' => $username);
+                       next;
+               }
+
+               my $vendor = $1;
+
+               $taskset->add_task('CPE_ping', "$ip $username $vendor", {
+                       on_complete => sub { push @$results, ${$_[0]} }
+               });
+               $redis->sadd('CPE.queued' => $ip);
+
+               $redis->sadd("_CPE.$vendor" => $ip); # FIXME
+       }
+
+       warn "# wait";
+       $taskset->wait;
+
+       my $finish = $self->datetime_now;
+       $redis->set( 'CPE.finish' => $finish );
+
+       my $poll;
+       foreach my $k ( $redis->keys('CPE.*') ) {
+               my $n = $k;
+               $n =~ s/^poll\.//;
+               $n =~ s/\./_/g;
+               $poll->{$n} = eval { $redis->scard($k) } || $redis->get($k);
+       }
+
+       warn "# poll = ",dump $poll;
+       $self->do_background_json('Store_insert', { _table => 'CPE_poll', %$poll });
+
+       warn "# results = ", dump $results;
+       return $poll;
+}
+
+sub ping : Job : Decode(d_array) : Encode(e_json) {
+       my ( $self, $job, $workload ) = @_;
+
+       my ( $ip, $username, $vendor ) = @$workload;
+
+       my $redis = Redis->new;
+
+       return { error => "invalid workload", expected => "ip username" } unless $ip && $username;
+
+       my $p = Net::Ping->new;
+       $p->hires;
+
+       my ( $ok, $rtt, $ping_ip ) = $p->ping( $ip );
+
+       if ( $ok ) {
+               $redis->sadd( 'CPE.ping.ok' => $ip );
+       } else {
+               $redis->sadd( 'CPE.ping.error' => $ip );
+               return { error => "ping $ip" };
+       }
+
+       $self->do_background_json( 'Store_insert', {
+                _table => 'ping',
+               username => $username,
+               timestamp => $self->datetime_now,
+               ip => $ping_ip,
+               rtt => $rtt,
+       });
+
+       if ( $vendor =~ m/SAMSUNG/ ) {
+               $redis->sadd( 'CPE.vendor.SAMSUNG' => $username );
+               $self->do_background( 'Davolink_info', "$ip $username adsl" );
+       } elsif ( $vendor =~ m/zte/ ) {
+               $redis->sadd( 'CPE.vendor.ZTE' => $username );
+               $self->do_background( 'EasyGateway_info', $ip );
+       } else {
+               $redis->sadd( 'CPE.vendor.error' => $username );
+       }
+
+       return { ip => $ip, rtt => $rtt };
+}
+
+1;
diff --git a/t/CPE.t b/t/CPE.t
new file mode 100755 (executable)
index 0000000..d884fec
--- /dev/null
+++ b/t/CPE.t
@@ -0,0 +1,18 @@
+#!/usr/bin/perl
+use strict;
+use warnings;
+
+use Test::More tests => 4;
+use Data::Dump qw(dump);
+
+use lib 'lib';
+
+use_ok 'APKPM::CPE';
+
+ok my $o = APKPM::CPE->new( server => 'localhost:4730' ), 'new';
+
+ok my $r = $o->poll( 'job', 'ab' ), 'poll';
+diag dump($r);
+
+ok my $r = $o->ping( 'job', [ '127.0.0.1', 'nobody', 'no-vendor' ] ), 'ping';
+diag dump($r);