3 package Cog; # persistant Gearman Worker - see the pun?
7 extends 'Gearman::Driver::Worker::Base';
10 with 'APKPM::Gearman';
11 with 'APKPM::Gearman::Client';
18 use Data::Dump qw(dump);
20 use Regexp::Common qw(net);
25 my $driver = Gearman::Driver->new(
26 server => 'localhost:4730',
29 logfile => 'log/persistant.log',
33 my $worker = $w1->new();
36 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
40 foreach my $user ( @$crm ) {
41 next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
42 $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
45 warn "# poll = ", dump($poll);
47 foreach my $ip ( keys %$poll ) {
49 my $method = "ZTEDSLAM_$ip";
51 my $zte = H1::ZTEDSLAM->new( ip => $ip );
54 max_processes => 1, # FIXME increase?
65 my ( $self, $job, $crm ) = @_;
67 eval { $hash = $zte->hash( $crm->{SHELF_SLOT_PORT} ) };
69 return { error => $@ } if $@;
71 $self->do_background_json( 'Store_insert', {
73 ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
74 username => $crm->{USERNAME},
75 timestamp => $self->datetime_now,
76 h => $self->to_hstore($hash),
79 # return $hash; # body end
86 warn "$method added\n";
95 name => 'poll_ZTEDSLAM',
98 my ( $self, $job, $workload ) = @_;
100 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
104 my $taskset = $worker->gc->new_task_set;
107 foreach my $user ( @$crm ) {
108 my $ip = $user->{IP_UREDAJA};
109 next unless $ip =~ /$RE{net}{IPv4}/;
110 my $name = 'ZTEDSLAM_' . $ip;
111 $taskset->add_task( $name, $self->e_json( $user ), {
112 on_complete => sub { "# $name done", $/ }
114 if ! $seen->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
119 warn "# seen ", dump($seen);