3 package Cog; # persistant Gearman Worker - see the pun?
7 extends 'Gearman::Driver::Worker::Base';
10 with 'APKPM::Gearman';
11 with 'APKPM::Gearman::Client';
18 use Data::Dump qw(dump);
20 use Regexp::Common qw(net);
27 my $variant = $ARGV[0] || 'ZTEDSLAM';
28 warn "variant: $variant\n";
30 my $driver = Gearman::Driver->new(
31 server => 'localhost:4730',
34 logfile => 'log/persistant.log',
38 my $worker = $w1->new();
41 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
46 foreach my $user ( @$crm ) {
47 next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
48 $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
51 warn "# poll = ", dump($poll);
53 foreach my $ip ( keys %$poll ) {
55 my $method = $variant . '_' . $ip;
57 my $module = 'H1::' . $variant;
58 my $zte = $module->new( ip => $ip );
61 max_processes => 1, # FIXME increase?
72 my ( $self, $job, $crm ) = @_;
74 if ( exists $crm->{logout} ) {
80 eval { $hash = $zte->hash( $crm->{SHELF_SLOT_PORT} ) };
82 my $redis = Redis->new;
85 $redis->sadd("$variant.error" => $@);
86 return { error => $@ };
88 $redis->sadd("$variant.ok" => $crm->{USERNAME});
91 $self->do_background_json( 'Store_insert', {
93 ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
94 username => $crm->{USERNAME},
95 timestamp => $self->datetime_now,
96 h => $self->to_hstore($hash),
99 # return $hash; # body end
106 warn "$method added\n";
114 name => "poll_$variant",
116 name => "poll_$variant",
119 my ( $self, $job, $workload ) = @_;
121 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
125 my $gc = Gearman::Client->new;
126 $gc->job_servers( $self->server || 'localhost:4730' );
127 my $taskset = $gc->new_task_set;
130 my $redis = Redis->new;
131 $redis->del( $_ ) foreach $redis->keys("$variant.*");
133 $redis->set( "$variant.start" => $self->datetime_now );
135 foreach my $user ( @$crm ) {
136 my $ip = $user->{IP_UREDAJA};
137 my $port = $user->{SHELF_SLOT_PORT};
139 if ( $ip !~ /$RE{net}{IPv4}/ ) {
140 $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
142 } elsif ( $port !~ m{\d+(/\d+)+} ) {
143 $redis->sadd("$variant.invalid.PORT" => $port);
145 } elsif ( $seen->{ $ip }->{ $port }++ ) {
146 $redis->sadd("$variant.invalid.duplicate" => "$ip $port");
149 $redis->sadd("$variant.queued" => "$ip $port");
150 $redis->incr("$variant.ip.$ip");
152 my $name = $variant . '_' . $ip;
153 $taskset->add_task( $name, $self->e_json( $user ), {
154 on_complete => sub { $redis->sadd("$variant.complete", "$ip $port") },
155 on_fail => sub { $redis->sadd("$variant.fail", "$ip $port") },
159 warn "# queue logouts";
160 foreach my $ip ( keys %$seen ) {
161 $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
167 $redis->set( "$variant.finish" => $self->datetime_now );
168 warn "# seen ", dump($seen);
170 my $ips = scalar keys %$seen;
171 return "polled $ips IPs"; # body end