modify schema for new CPE stats
[APKPM.git] / lib / APKPM / Poll.pm
1 package APKPM::Poll;
2
3 use base qw(Gearman::Driver::Worker);
4 use Moose;
5 use Time::HiRes qw(time);
6 use Data::Dump qw(dump);
7 use Redis;
8 use Net::Ping;
9
10 with 'APKPM::Gearman::Client';
11 with 'APKPM::Gearman';
12
13 sub prefix { '' }
14
15 sub poll_prefix : Job : Encode(e_json) {
16         my ( $self, $job, $workload ) = @_;
17
18         my $redis = Redis->new;
19         $redis->del( $_ ) foreach $redis->keys('poll.*');
20
21         my $start = $self->datetime_now;
22
23         $redis->set( 'poll.start' => $start );
24
25         my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
26
27         my $ip_username;
28         foreach my $entry ( @$entries ) {
29                 $ip_username->{ $entry->{cn} } = $1 if $entry->{dhcpStatements} =~ m/fixed-address\s+(\S+)/;
30         }
31
32         my $taskset = $self->gc->new_task_set;
33         my $results;
34
35         while (my ($username,$ip) = each %$ip_username) {
36                 $taskset->add_task('poll_ip_username', "$ip $username", {
37                         on_complete => sub { push @$results, ${$_[0]} }
38                 });
39                 $redis->sadd('poll.queued' => $ip);
40         }
41
42         undef $entries;
43         undef $ip_username;
44
45         warn "# wait";
46         $taskset->wait;
47
48         my $finish = $self->datetime_now;
49         $redis->set( 'poll.finish' => $finish );
50
51         my $poll;
52         foreach my $k ( $redis->keys('poll.*') ) {
53                 my $n = $k;
54                 $n =~ s/^poll\.//;
55                 $n =~ s/\./_/g;
56                 $poll->{$n} = eval { $redis->scard($k) } || $redis->get($k);
57         }
58
59         warn "# poll = ",dump $poll;
60         $self->do_background_json('Store_insert', { _table => 'poll', %$poll });
61
62         warn "# results = ", dump $results;
63         return $poll;
64 }
65
66 sub poll_ip_username : Job : Decode(d_array) : Encode(e_json) {
67         my ( $self, $job, $workload ) = @_;
68
69         my ( $ip, $username ) = @$workload;
70
71         return { error => "invalid workload", expected => "ip username" } unless $ip && $username;
72
73         my $p = Net::Ping->new;
74         $p->hires;
75
76         my $redis = Redis->new;
77         
78         my ( $ok, $rtt, $ping_ip ) = $p->ping( $ip );
79
80         if ( $ok ) {
81                 $redis->sadd( 'poll.ping.ok' => $ip );
82         } else {
83                 $redis->sadd( 'poll.ping.error' => $ip );
84                 return { error => "ping $ip" };
85         }
86
87         $self->do_background_json( 'Store_insert', {
88                  _table => 'ping',
89                 username => $username,
90                 timestamp => $self->datetime_now,
91                 ip => $ping_ip,
92                 rtt => $rtt,
93         });
94
95         $self->do_background( 'Davolink_info', "$ip $username adsl" );
96
97         return { ip => $ip, rtt => $rtt };
98 }
99
100 1;