3 use base qw(Gearman::Driver::Worker);
6 use Data::Dump qw(dump);
9 with 'APKPM::Gearman::Client';
10 with 'APKPM::Gearman';
12 sub prefix { 'poll_' }
14 sub by_prefix : Job : MinProcesses(1) : MaxProcesses(1) : Encode(e_json) {
15 my ( $self, $job, $workload ) = @_;
17 my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
20 foreach my $entry ( @$entries ) {
21 $ip_username->{ $entry->{cn} } = $1 if $entry->{dhcpStatements} =~ m/fixed-address\s+(\S+)/;
24 my $redis = Redis->new;
25 $redis->del( $_ ) foreach $redis->keys('poll.*');
27 my $taskset = $self->gc->new_task_set;
29 while (my ($username,$ip) = each %$ip_username) {
30 $taskset->add_task('poll_ip_username', "$ip $username", {
31 on_complete => sub { push @$results, ${$_[0]} }
33 $redis->sadd('poll.queued' => $ip);
39 return { ldap => $ip_username, results => $results };
42 sub ip_username : Job : MinProcesses(1) : MaxProcesses(25) : Decode(d_array) : Encode(e_json) {
43 my ( $self, $job, $workload ) = @_;
45 my ( $ip, $username ) = @$workload;
47 return { error => "invalid workload", expected => "ip username" } unless $ip && $username;
48 my $ping = $self->do('ping', $ip);
50 my $redis = Redis->new;
52 if ( exists $ping->{error} ) {
53 warn "XXX error: $ip";
54 $redis->sadd( 'poll.ping.error' => $ip );
57 $redis->sadd( 'poll.ping.ok' => $ip );
60 $ping->{username} = $username;
61 $self->do_background_json( 'Store_ping', $ping );
63 $self->do_background( 'Davolink_info', "$ip $username adsl" );