store each poll stats in postgres
[APKPM.git] / lib / APKPM / Poll.pm
1 package APKPM::Poll;
2
3 use base qw(Gearman::Driver::Worker);
4 use Moose;
5 use Time::HiRes qw(time);
6 use Data::Dump qw(dump);
7 use Redis;
8 use DateTime;
9 use DateTime::Format::Pg;
10
11 with 'APKPM::Gearman::Client';
12 with 'APKPM::Gearman';
13
14 sub prefix { 'poll_' }
15
16 sub by_prefix : Job : MinProcesses(1) : MaxProcesses(1) : Encode(e_json) {
17         my ( $self, $job, $workload ) = @_;
18
19         my $start = DateTime->now();
20
21         my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
22
23         my $ip_username;
24         foreach my $entry ( @$entries ) {
25                 $ip_username->{ $entry->{cn} } = $1 if $entry->{dhcpStatements} =~ m/fixed-address\s+(\S+)/;
26         }
27
28         my $redis = Redis->new;
29         $redis->del( $_ ) foreach $redis->keys('poll.*');
30
31         my $taskset = $self->gc->new_task_set;
32         my $results;
33         while (my ($username,$ip) = each %$ip_username) {
34                 $taskset->add_task('poll_ip_username', "$ip $username", {
35                         on_complete => sub { push @$results, ${$_[0]} }
36                 });
37                 $redis->sadd('poll.queued' => $ip);
38         }
39
40         warn "# wait";
41         $taskset->wait;
42
43         my $finish = DateTime->now();
44
45         my $poll;
46         foreach my $k ( $redis->keys('poll.*') ) {
47                 my $n = $k;
48                 $n =~ s/^poll\.//;
49                 $n =~ s/\./_/g;
50                 $poll->{$n} = $redis->scard($k);
51         }
52         $poll->{start}  = DateTime::Format::Pg->format_datetime($start);
53         $poll->{finish} = DateTime::Format::Pg->format_datetime($finish);
54
55         $poll->{_table} = 'poll';
56
57         warn dump $poll;
58         $self->do_background_json('Store_insert_table', $poll);
59
60         return { ldap => $ip_username, results => $results, duration => $finish - $start, poll => $poll };
61 }
62
63 sub ip_username : Job : MinProcesses(1) : MaxProcesses(25) : Decode(d_array) : Encode(e_json) {
64         my ( $self, $job, $workload ) = @_;
65
66         my ( $ip, $username ) = @$workload;
67
68         return { error => "invalid workload", expected => "ip username" } unless $ip && $username;
69         my $ping = $self->do('ping', $ip);
70
71         my $redis = Redis->new;
72
73         if ( exists $ping->{error} ) {
74 warn "XXX error: $ip";
75                 $redis->sadd( 'poll.ping.error' => $ip );
76                 return $ping;
77         } else {
78                 $redis->sadd( 'poll.ping.ok' => $ip );
79         }
80
81         $ping->{username} = $username;
82         $self->do_background_json( 'Store_ping', $ping );
83
84         $self->do_background( 'Davolink_info', "$ip $username adsl" );
85
86         return $ping;
87 }
88
89 1;