3 use base qw(Gearman::Driver::Worker);
5 use Time::HiRes qw(time);
6 use Data::Dump qw(dump);
10 with 'APKPM::Gearman::Client';
11 with 'APKPM::Gearman';
15 sub poll_prefix : Job : Encode(e_json) {
16 my ( $self, $job, $workload ) = @_;
18 my $redis = Redis->new;
19 $redis->del( $_ ) foreach $redis->keys('poll.*');
21 my $start = $self->datetime_now;
23 $redis->set( 'poll.start' => $start );
25 my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
28 foreach my $entry ( @$entries ) {
29 $ip_username->{ $entry->{cn} } = $1 if $entry->{dhcpStatements} =~ m/fixed-address\s+(\S+)/;
32 my $taskset = $self->gc->new_task_set;
35 while (my ($username,$ip) = each %$ip_username) {
36 $taskset->add_task('poll_ip_username', "$ip $username", {
37 on_complete => sub { push @$results, ${$_[0]} }
39 $redis->sadd('poll.queued' => $ip);
48 my $finish = $self->datetime_now;
49 $redis->set( 'poll.finish' => $finish );
52 foreach my $k ( $redis->keys('poll.*') ) {
56 $poll->{$n} = eval { $redis->scard($k) } || $redis->get($k);
59 warn "# poll = ",dump $poll;
60 $self->do_background_json('Store_insert', { _table => 'poll', %$poll });
62 warn "# results = ", dump $results;
66 sub poll_ip_username : Job : Decode(d_array) : Encode(e_json) {
67 my ( $self, $job, $workload ) = @_;
69 my ( $ip, $username ) = @$workload;
71 return { error => "invalid workload", expected => "ip username" } unless $ip && $username;
73 my $p = Net::Ping->new;
76 my $redis = Redis->new;
78 my ( $ok, $rtt, $ping_ip ) = $p->ping( $ip );
81 $redis->sadd( 'poll.ping.ok' => $ip );
83 $redis->sadd( 'poll.ping.error' => $ip );
84 return { error => "ping $ip" };
87 $self->do_background_json( 'Store_insert', {
89 username => $username,
90 timestamp => $self->datetime_now,
95 $self->do_background( 'Davolink_info', "$ip $username adsl" );
97 return { ip => $ip, rtt => $rtt };