replacement for APKPM::Poll
[APKPM.git] / lib / APKPM / CPE.pm
1 package APKPM::CPE;
2
3 use base qw(Gearman::Driver::Worker);
4 use Moose;
5 use Time::HiRes qw(time);
6 use Data::Dump qw(dump);
7 use Redis;
8 use Net::Ping;
9
10 with 'APKPM::Gearman::Client';
11 with 'APKPM::Gearman';
12
13 sub prefix { 'CPE_' }
14
15 sub poll : Job : Encode(e_json) {
16         my ( $self, $job, $workload ) = @_;
17
18         my $redis = Redis->new;
19         $redis->del( $_ ) foreach $redis->keys('CPE.*');
20
21         my $start = $self->datetime_now;
22
23         $redis->set( 'CPE.start' => $start );
24
25         my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
26
27         my $taskset = $self->gc->new_task_set;
28         my $results;
29
30         foreach my $entry ( @$entries ) {
31
32                 my $username = $entry->{cn};
33
34                 if ( $entry->{dhcpStatements} !~ m/fixed-address\s+(\S+)/ ) {
35                         $redis->sadd('CPE.error.no-ip' => $username);
36                         next;
37                 }
38                 my $ip = $1;
39
40                 if ( $entry->{dhcpOption} !~ m/vendor-class-identifier\s\"([^"]+)\"/ ) {
41                         $redis->sadd('CPE.error.vendor' => $username);
42                         next;
43                 }
44
45                 my $vendor = $1;
46
47                 $taskset->add_task('CPE_ping', "$ip $username $vendor", {
48                         on_complete => sub { push @$results, ${$_[0]} }
49                 });
50                 $redis->sadd('CPE.queued' => $ip);
51
52                 $redis->sadd("_CPE.$vendor" => $ip); # FIXME
53         }
54
55         warn "# wait";
56         $taskset->wait;
57
58         my $finish = $self->datetime_now;
59         $redis->set( 'CPE.finish' => $finish );
60
61         my $poll;
62         foreach my $k ( $redis->keys('CPE.*') ) {
63                 my $n = $k;
64                 $n =~ s/^poll\.//;
65                 $n =~ s/\./_/g;
66                 $poll->{$n} = eval { $redis->scard($k) } || $redis->get($k);
67         }
68
69         warn "# poll = ",dump $poll;
70         $self->do_background_json('Store_insert', { _table => 'CPE_poll', %$poll });
71
72         warn "# results = ", dump $results;
73         return $poll;
74 }
75
76 sub ping : Job : Decode(d_array) : Encode(e_json) {
77         my ( $self, $job, $workload ) = @_;
78
79         my ( $ip, $username, $vendor ) = @$workload;
80
81         my $redis = Redis->new;
82
83         return { error => "invalid workload", expected => "ip username" } unless $ip && $username;
84
85         my $p = Net::Ping->new;
86         $p->hires;
87
88         my ( $ok, $rtt, $ping_ip ) = $p->ping( $ip );
89
90         if ( $ok ) {
91                 $redis->sadd( 'CPE.ping.ok' => $ip );
92         } else {
93                 $redis->sadd( 'CPE.ping.error' => $ip );
94                 return { error => "ping $ip" };
95         }
96
97         $self->do_background_json( 'Store_insert', {
98                  _table => 'ping',
99                 username => $username,
100                 timestamp => $self->datetime_now,
101                 ip => $ping_ip,
102                 rtt => $rtt,
103         });
104
105         if ( $vendor =~ m/SAMSUNG/ ) {
106                 $redis->sadd( 'CPE.vendor.SAMSUNG' => $username );
107                 $self->do_background( 'Davolink_info', "$ip $username adsl" );
108         } elsif ( $vendor =~ m/zte/ ) {
109                 $redis->sadd( 'CPE.vendor.ZTE' => $username );
110                 $self->do_background( 'EasyGateway_info', $ip );
111         } else {
112                 $redis->sadd( 'CPE.vendor.error' => $username );
113         }
114
115         return { ip => $ip, rtt => $rtt };
116 }
117
118 1;