use do_high for CRM_search
[APKPM.git] / lib / APKPM / CPE.pm
1 package APKPM::CPE;
2
3 use base qw(Gearman::Driver::Worker);
4 use Moose;
5 use Time::HiRes qw(time);
6 use Data::Dump qw(dump);
7 use Redis;
8 use Net::Ping;
9
10 use lib 'lib';
11 use H1::EasyGateway;
12 use H1::Davolink;
13
14 with 'APKPM::Gearman::Client';
15 with 'APKPM::Gearman';
16
17 sub prefix { 'CPE_' }
18
19 sub poll : Job : Encode(e_json) {
20         my ( $self, $job, $workload ) = @_;
21
22         my $redis = Redis->new;
23         $redis->del( $_ ) foreach $redis->keys('CPE.*');
24
25         my $start = $self->datetime_now;
26
27         $redis->set( 'CPE.start' => $start );
28
29         my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
30
31         my $taskset = $self->gc->new_task_set;
32         my $results;
33
34         foreach my $entry ( @$entries ) {
35
36                 my $username = $entry->{cn} || die "no cn in ", dump($entry);
37
38                 if ( $entry->{dhcpStatements} !~ m/fixed-address\s+(\S+)/ ) {
39                         $redis->sadd('CPE.error.no-ip' => $username);
40                         next;
41                 }
42                 my $ip = $1;
43
44                 if ( $entry->{dhcpOption} !~ m/vendor-class-identifier\s\"([^"]+)\"/ ) {
45                         $redis->sadd('CPE.error.vendor' => $username);
46                         next;
47                 }
48
49                 my $vendor = $1;
50
51                 $taskset->add_task('CPE_info', "$ip $username $vendor", {
52                         on_complete => sub { push @$results, ${$_[0]} }
53                 });
54                 $redis->sadd('CPE.queued' => $ip);
55
56                 $vendor =~ s/\s+/_/g; # sadd dies on spaces in keys
57                 $redis->sadd("CPE.vendor.$vendor" => $ip); # FIXME
58         }
59
60         warn "# wait";
61         $taskset->wait;
62
63         my $finish = $self->datetime_now;
64         $redis->set( 'CPE.finish' => $finish );
65
66         my $poll;
67         foreach my $k ( $redis->keys('CPE.*') ) {
68                 my $n = $k;
69                 $n =~ s/^CPE\.//;
70                 $n =~ s/\./_/g;
71                 $poll->{$n} = eval { $redis->scard($k) } || $redis->get($k);
72         }
73
74         warn "# poll = ",dump $poll;
75         $self->do_background_json('Store_insert', { _table => 'CPE_poll', %$poll });
76
77         warn "# results = ", dump $results;
78         return $poll;
79 }
80
81 sub info : Job : Decode(d_array) : Encode(e_json) {
82         my ( $self, $job, $workload ) = @_;
83
84         my ( $ip, $username, $vendor ) = @$workload;
85         
86         my ( $ret );
87
88         my $redis = Redis->new;
89
90         return { error => "invalid workload", expected => "ip username" } unless $ip && $username;
91
92         my $p = Net::Ping->new;
93         $p->hires;
94
95         my ( $ok, $rtt, $ping_ip ) = $p->ping( $ip );
96
97         if ( $ok ) {
98                 $redis->sadd( 'CPE.ping.ok' => $ip );
99         } else {
100                 $redis->sadd( 'CPE.ping.error' => $ip );
101                 return { error => "ping $ip" };
102         }
103
104         $self->do_background_json( 'Store_insert', {
105                  _table => 'ping',
106                 username => $username,
107                 timestamp => $self->datetime_now,
108                 ip => $ping_ip,
109                 rtt => $rtt,
110         });
111
112         if ( $vendor =~ m/SAMSUNG/ ) {
113                 $vendor = 'Davolink';
114         } elsif ( $vendor =~ m/zte/ ) {
115                 $vendor = 'EasyGateway';
116         } else {
117                 $redis->sadd( 'CPE.skipped' => $username );
118                 $vendor = undef;
119         }
120
121         if ( $vendor ) {
122                 $redis->sadd( "CPE.$vendor.queued" => $ip );
123
124                 my $module = 'H1::' . $vendor;
125                 $ret = eval { $module->info( $ip ) };
126
127                 warn "# $module ",dump($ret);
128
129                 if ( $ret ) {
130                         $self->do_background_json( 'Store_insert', {
131                                 _table => 'cpe_' . $vendor,
132                                 ip => $ip,
133                                 username => $username,
134                                 timestamp => $self->datetime_now,
135                                 h => $ret,
136                         });
137                         $redis->sadd( "CPE.$vendor.ok" => $ip );
138                 } else {
139                         $redis->sadd( "CPE.$vendor.error" => $ip );
140                 }
141         }
142
143         return { ip => $ip, rtt => $rtt, h => $ret };
144 }
145
146 1;