3 package Cog; # persistant Gearman Worker - see the pun?
7 extends 'Gearman::Driver::Worker::Base';
10 with 'APKPM::Gearman';
11 with 'APKPM::Gearman::Client';
18 use Data::Dump qw(dump);
20 use Regexp::Common qw(net);
27 my ($variant,$max_processes) = @ARGV;
28 $variant ||= 'ZTEDSLAM';
30 warn "variant: $variant max_processes: $max_processes\n";
32 my $driver = Gearman::Driver->new(
33 server => 'localhost:4730',
36 logfile => "log/$variant.log",
40 my $worker = $w1->new();
43 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
48 foreach my $user ( @$crm ) {
49 next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
50 $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
53 warn "# poll = ", dump($poll);
55 foreach my $ip ( keys %$poll ) {
57 my $method = $variant . '_' . $ip;
59 my $module = 'H1::' . $variant;
60 my $zte = $module->new( ip => $ip );
63 max_processes => $max_processes,
76 my ( $self, $job, $crm ) = @_;
78 my $redis = Redis->new;
80 if ( exists $crm->{logout} ) {
82 $redis->sadd("$variant.$ip.logout", $self->datetime_now);
83 return { logout => 'ok' };
86 my $port = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT";
87 my $username = $crm->{USERNAME} || die "no USERNAME";
90 $redis->sadd("$variant.invalid.SHELF_SLOT_PORT" => $username );
91 return "invalid port $port";
95 eval { $hash = $zte->hash( $port ) };
98 $redis->sadd("$variant.$ip.error" => $@);
100 } elsif ( ! $hash ) {
101 $redis->sadd("$variant.$ip.empty" => $port);
104 $redis->sadd("$variant.$ip.ok" => $port);
107 $self->do_background_json( 'Store_insert', {
109 ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
110 username => $username,
111 timestamp => $self->datetime_now,
112 h => $self->to_hstore($hash),
115 return "ok $username $ip $port"; # body end
123 warn "$method added\n";
131 name => "poll_$variant",
133 name => "poll_$variant",
136 my ( $self, $job, $workload ) = @_;
138 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
142 my $gc = Gearman::Client->new;
143 $gc->job_servers( $self->server || 'localhost:4730' );
144 my $taskset = $gc->new_task_set;
147 my $redis = Redis->new;
148 $redis->del( $_ ) foreach $redis->keys("$variant.*");
150 $redis->set( "$variant.time.start" => $self->datetime_now );
152 foreach my $user ( @$crm ) {
153 my $ip = $user->{IP_UREDAJA};
154 my $port = $user->{SHELF_SLOT_PORT};
156 if ( $ip !~ /$RE{net}{IPv4}/ ) {
157 $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
159 } elsif ( $port !~ m{\d+(/\d+)+} ) {
160 $redis->sadd("$variant.$ip.invalid.PORT" => $port);
162 } elsif ( $seen->{ $ip }->{ $port }++ ) {
163 $redis->sadd("$variant.$ip.duplicate" => $port);
166 $redis->incr("$variant.queued");
167 $redis->sadd("$variant.$ip.queued" => $port);
169 my $name = $variant . '_' . $ip;
170 $taskset->add_task( $name, $self->e_json( $user ), {
171 on_complete => sub { $redis->sadd("$variant.$ip.complete" => $port) },
172 on_fail => sub { $redis->sadd("$variant.$ip.failed" => $port) },
176 warn "# queue logouts";
177 foreach my $ip ( keys %$seen ) {
178 $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
184 $redis->set( "$variant.time.finish" => $self->datetime_now );
185 warn "# seen ", dump($seen);
187 my $ips = scalar keys %$seen;
188 return "polled $ips IPs"; # body end