+$driver->add_job({
+ max_processes => 1,
+ min_processes => 0,
+ worker => $worker,
+ name => $variant . '_poll',
+ methods => [ {
+ name => $variant . '_poll',
+ encode => 'e_json',
+ body => sub {
+
+ my ( $self, $job, $workload ) = @_;
+
+ my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
+
+ warn "# crm $crm";
+
+ my $gc = Gearman::Client->new;
+ $gc->job_servers( $self->server || 'localhost:4730' );
+ my $taskset = $gc->new_task_set;
+ my $seen;
+
+ my $redis = Redis->new;
+ $redis->del( $_ ) foreach $redis->keys("$variant.*");
+
+ $redis->set( "$variant.poll.start" => $self->datetime_now );
+
+ foreach my $user ( @$crm ) {
+ # required CRM fields for worker
+ my $ip = $user->{IP_UREDAJA} || next;
+ my $port = $user->{SHELF_SLOT_PORT} || next;
+ my $username = $user->{USERNAME} || next;
+
+ if ( $ip !~ /$RE{net}{IPv4}/ ) {
+ $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
+ next;
+ } elsif ( $port !~ m{\d+(/\d+)+} ) {
+ $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port);
+ next;
+ } elsif ( ! $username ) {
+ $redis->sadd("$variant.$ip.invalid.USERNAME" => $port);
+ next;
+ } elsif ( $seen->{ $ip }->{ $port }++ ) {
+ $redis->sadd("$variant.$ip.invalid.duplicate" => $port);
+ next;
+ }
+
+ $redis->incr("$variant.poll.queued");
+ $redis->sadd("$variant.$ip.queued" => $port);
+
+ my $name = $variant . '_' . $ip;
+ $taskset->add_task( $name, $self->e_json( $user ), {
+ on_complete => sub { $redis->sadd("$variant.$ip.complete" => $port) },
+ on_fail => sub { $redis->sadd("$variant.$ip.failed" => $port) },
+ } )
+ }
+
+ warn "# queue logouts";
+ foreach my $ip ( keys %$seen ) {
+ $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
+ }
+
+ warn "# wait";
+ $taskset->wait;
+
+ $redis->set( "$variant.poll.finish" => $self->datetime_now );
+
+ my $status = APKPM::Model->redis_status;
+ my $row = $status->{poll}->{$variant};
+ $row->{$_} = $status->{"$variant.poll.$_"} foreach ( 'start', 'finish' );
+ $row->{variant} = $variant;
+ $self->do_background_json('Store_insert', { _table => 'dslam_poll', %$row });
+ warn "# dslam_poll ",dump($row);
+
+ my $ips = scalar keys %$seen;
+ my $ports = 0;
+ $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen;
+ warn "# finish $ips ips with $ports ports\n";
+ return { ips => $ips, ports => $ports, dslam_poll => $row } ; # body end
+ }
+ } ],
+});
+
+open(my $pid, '>', "/tmp/apkpm.$variant.pid");
+print $pid "$$\n";
+close $pid;
+