--- /dev/null
+package APKPM::CPE;
+
+use base qw(Gearman::Driver::Worker);
+use Moose;
+use Time::HiRes qw(time);
+use Data::Dump qw(dump);
+use Redis;
+use Net::Ping;
+
+with 'APKPM::Gearman::Client';
+with 'APKPM::Gearman';
+
+sub prefix { 'CPE_' }
+
+sub poll : Job : Encode(e_json) {
+ my ( $self, $job, $workload ) = @_;
+
+ my $redis = Redis->new;
+ $redis->del( $_ ) foreach $redis->keys('CPE.*');
+
+ my $start = $self->datetime_now;
+
+ $redis->set( 'CPE.start' => $start );
+
+ my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
+
+ my $taskset = $self->gc->new_task_set;
+ my $results;
+
+ foreach my $entry ( @$entries ) {
+
+ my $username = $entry->{cn};
+
+ if ( $entry->{dhcpStatements} !~ m/fixed-address\s+(\S+)/ ) {
+ $redis->sadd('CPE.error.no-ip' => $username);
+ next;
+ }
+ my $ip = $1;
+
+ if ( $entry->{dhcpOption} !~ m/vendor-class-identifier\s\"([^"]+)\"/ ) {
+ $redis->sadd('CPE.error.vendor' => $username);
+ next;
+ }
+
+ my $vendor = $1;
+
+ $taskset->add_task('CPE_ping', "$ip $username $vendor", {
+ on_complete => sub { push @$results, ${$_[0]} }
+ });
+ $redis->sadd('CPE.queued' => $ip);
+
+ $redis->sadd("_CPE.$vendor" => $ip); # FIXME
+ }
+
+ warn "# wait";
+ $taskset->wait;
+
+ my $finish = $self->datetime_now;
+ $redis->set( 'CPE.finish' => $finish );
+
+ my $poll;
+ foreach my $k ( $redis->keys('CPE.*') ) {
+ my $n = $k;
+ $n =~ s/^poll\.//;
+ $n =~ s/\./_/g;
+ $poll->{$n} = eval { $redis->scard($k) } || $redis->get($k);
+ }
+
+ warn "# poll = ",dump $poll;
+ $self->do_background_json('Store_insert', { _table => 'CPE_poll', %$poll });
+
+ warn "# results = ", dump $results;
+ return $poll;
+}
+
+sub ping : Job : Decode(d_array) : Encode(e_json) {
+ my ( $self, $job, $workload ) = @_;
+
+ my ( $ip, $username, $vendor ) = @$workload;
+
+ my $redis = Redis->new;
+
+ return { error => "invalid workload", expected => "ip username" } unless $ip && $username;
+
+ my $p = Net::Ping->new;
+ $p->hires;
+
+ my ( $ok, $rtt, $ping_ip ) = $p->ping( $ip );
+
+ if ( $ok ) {
+ $redis->sadd( 'CPE.ping.ok' => $ip );
+ } else {
+ $redis->sadd( 'CPE.ping.error' => $ip );
+ return { error => "ping $ip" };
+ }
+
+ $self->do_background_json( 'Store_insert', {
+ _table => 'ping',
+ username => $username,
+ timestamp => $self->datetime_now,
+ ip => $ping_ip,
+ rtt => $rtt,
+ });
+
+ if ( $vendor =~ m/SAMSUNG/ ) {
+ $redis->sadd( 'CPE.vendor.SAMSUNG' => $username );
+ $self->do_background( 'Davolink_info', "$ip $username adsl" );
+ } elsif ( $vendor =~ m/zte/ ) {
+ $redis->sadd( 'CPE.vendor.ZTE' => $username );
+ $self->do_background( 'EasyGateway_info', $ip );
+ } else {
+ $redis->sadd( 'CPE.vendor.error' => $username );
+ }
+
+ return { ip => $ip, rtt => $rtt };
+}
+
+1;