3 package Cog; # persistant Gearman Worker - see the pun?
7 extends 'Gearman::Driver::Worker::Base';
10 with 'APKPM::Gearman';
11 with 'APKPM::Gearman::Client';
18 use Data::Dump qw(dump);
20 use Regexp::Common qw(net);
26 my $driver = Gearman::Driver->new(
27 server => 'localhost:4730',
30 logfile => 'log/persistant.log',
34 my $worker = $w1->new();
37 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
42 foreach my $user ( @$crm ) {
43 next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
44 $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
47 warn "# poll = ", dump($poll);
49 foreach my $ip ( keys %$poll ) {
51 my $method = "ZTEDSLAM_$ip";
53 my $zte = H1::ZTEDSLAM->new( ip => $ip );
56 max_processes => 1, # FIXME increase?
67 my ( $self, $job, $crm ) = @_;
69 if ( exists $crm->{logout} ) {
75 eval { $hash = $zte->hash( $crm->{SHELF_SLOT_PORT} ) };
77 my $redis = Redis->new;
80 $redis->sadd("ZTEDSLAM.error" => $@);
81 return { error => $@ };
83 $redis->sadd("ZTEDSLAM.ok" => $crm->{USERNAME});
86 $self->do_background_json( 'Store_insert', {
88 ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
89 username => $crm->{USERNAME},
90 timestamp => $self->datetime_now,
91 h => $self->to_hstore($hash),
94 # return $hash; # body end
101 warn "$method added\n";
109 name => 'poll_ZTEDSLAM',
111 name => 'poll_ZTEDSLAM',
114 my ( $self, $job, $workload ) = @_;
116 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
120 my $gc = Gearman::Client->new;
121 $gc->job_servers( $self->server || 'localhost:4730' );
122 my $taskset = $gc->new_task_set;
125 my $redis = Redis->new;
126 $redis->del( $_ ) foreach $redis->keys("ZTEDSLAM.*");
128 $redis->set( "ZTEDSLAM.start" => $self->datetime_now );
130 foreach my $user ( @$crm ) {
131 my $ip = $user->{IP_UREDAJA};
132 my $port = $user->{SHELF_SLOT_PORT};
134 if ( $ip !~ /$RE{net}{IPv4}/ ) {
135 $redis->sadd("ZTEDSLAM.invalid.IP_UREDAJA" => $ip);
137 } elsif ( $port !~ m{\d+(/\d+)+} ) {
138 $redis->sadd("ZTEDSLAM.invalid.PORT" => $port);
141 $redis->sadd("ZTEDSLAM.queued" => "$ip $port");
143 my $name = 'ZTEDSLAM_' . $ip;
144 $taskset->add_task( $name, $self->e_json( $user ), {
145 on_complete => sub { $redis->sadd("ZTEDSLAM.complete", "$ip $port") },
146 on_fail => sub { $redis->sadd("ZTEDSLAM.fail", "$ip $port") },
148 if ! $seen->{ $ip }->{ $port }++;
151 warn "# queue logouts";
152 foreach my $ip ( keys %$seen ) {
153 $taskset->add_task( "ZTEDSLAM_$ip", $self->e_json( { logout => 1 } ) );
159 $redis->set( "ZTEDSLAM.finish" => $self->datetime_now );
160 warn "# seen ", dump($seen);
162 my $ips = scalar keys %$seen;
163 return "polled $ips IPs"; # body end