set min_processes to 0 for poll_*
[APKPM.git] / persistant_worker.pl
1 #!/usr/bin/perl
2
3 package Cog; # persistant Gearman Worker - see the pun?
4
5 use Moose;
6 use JSON::XS qw();
7 extends 'Gearman::Driver::Worker::Base';
8
9 use lib 'lib';
10 with 'APKPM::Gearman';
11 with 'APKPM::Gearman::Client';
12
13 sub prefix { '' }
14
15 package main;
16
17 use Gearman::Driver;
18 use Data::Dump qw(dump);
19
20 use Regexp::Common qw(net);
21 use Redis;
22
23 use lib 'lib';
24 use H1::ZTEDSLAM;
25 use H1::ZTEMSAN;
26
27 my ($variant,$max_processes) = @ARGV;
28 $variant ||= 'ZTEDSLAM';
29 $max_processes ||= 1;
30 warn "variant: $variant max_processes: $max_processes\n";
31
32 my $driver = Gearman::Driver->new(
33         server   => 'localhost:4730',
34         interval => 60,
35         loglevel   => 'DEBUG',
36         logfile    => "log/$variant.log",
37 ) || die $!;
38
39 my $w1 = 'Cog';
40 my $worker = $w1->new();
41
42
43 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
44
45
46 my $poll;
47
48 foreach my $user ( @$crm ) {
49         next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
50         $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
51 }
52
53 warn "# poll = ", dump($poll);
54
55 foreach my $ip ( keys %$poll ) {
56
57         my $method = $variant . '_' . $ip;
58
59         my $module = 'H1::' . $variant;
60         my $zte = $module->new( ip => $ip );
61
62         my $ports = keys %{ $poll->{$ip} };
63         my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports
64         $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel
65
66         $driver->add_job({
67                 max_processes => $max_procs,
68                 min_processes => 0,
69                 name            => $method,
70                 worker          => $worker,
71                 methods    => [
72                         {
73                                 decode => 'd_json',
74 #                               encode => 'e_json',
75                                 name   => $method,
76                                 body   => sub {
77
78 # XXX -- worker body
79
80                 my ( $self, $job, $crm ) = @_;
81
82                 my $redis = Redis->new;
83
84                 if ( exists $crm->{logout} ) {
85                         $zte->logout;
86                         $redis->sadd("$variant.$ip.logout", $self->datetime_now);
87                         return "logout ok"; # XXX return scalar!
88                 }
89
90                 my $port     = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT";
91                 my $username = $crm->{USERNAME}        || die "no USERNAME";
92
93                 my $hash;
94                 eval { $hash = $zte->hash( $port ) };
95
96                 if ( $@ ) {
97                         $redis->sadd("$variant.$ip.error" => $@);
98                         return "error: $@";
99                 } elsif ( ! $hash ) {
100                         $redis->sadd("$variant.$ip.empty" => $port);
101                         return "empty";
102                 } else {
103                         $redis->sadd("$variant.$ip.ok" => $port);
104                 }
105
106                 $self->do_background_json( 'Store_insert', {
107                         _table => 'dslam_h',
108                         ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
109                         username => $username,
110                         timestamp => $self->datetime_now,
111                         h => $self->to_hstore($hash),
112                 });
113
114                 return "ok $username $ip $port"; # body end
115
116 # XXX -- worker body
117                                 },
118                         },
119                 ]
120         });
121
122         warn "$method for $ip with $ports ports using $max_procs processes\n";
123
124 }
125
126 $driver->add_job({
127         max_processes => 1,
128         min_processes => 0,
129         worker          => $worker,
130         name            => "poll_$variant",
131         methods    => [ {
132                 name            => "poll_$variant",
133                 body   => sub {
134
135                 my ( $self, $job, $workload ) = @_;
136
137                 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
138
139                 warn "# crm $crm";
140
141                 my $gc = Gearman::Client->new;
142                 $gc->job_servers( $self->server || 'localhost:4730' );
143                 my $taskset = $gc->new_task_set;
144                 my $seen;
145
146                 my $redis = Redis->new;
147                 $redis->del( $_ ) foreach $redis->keys("$variant.*");
148
149                 $redis->set( "$variant.poll.start" => $self->datetime_now );
150
151                 foreach my $user ( @$crm ) {
152                         # required CRM fields for worker
153                         my $ip   = $user->{IP_UREDAJA}      || next;
154                         my $port = $user->{SHELF_SLOT_PORT} || next;
155                         my $username = $user->{USERNAME}    || next;
156
157                         if ( $ip !~ /$RE{net}{IPv4}/ ) {
158                                 $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
159                                 next;
160                         } elsif ( $port !~ m{\d+(/\d+)+} ) {
161                                 $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port);
162                                 next;
163                         } elsif ( ! $username ) {
164                                 $redis->sadd("$variant.$ip.invalid.USERNAME" => $port);
165                                 next;
166                         } elsif ( $seen->{ $ip }->{ $port }++ ) {
167                                 $redis->sadd("$variant.$ip.invalid.duplicate" => $port);
168                                 next;
169                         }
170
171                         $redis->incr("$variant.poll.queued");
172                         $redis->sadd("$variant.$ip.queued" => $port);
173
174                         my $name = $variant . '_' . $ip;
175                         $taskset->add_task( $name, $self->e_json( $user ), {
176                                 on_complete => sub { $redis->sadd("$variant.$ip.complete" => $port) },
177                                 on_fail     => sub { $redis->sadd("$variant.$ip.failed" => $port) },
178                         } )
179                 }
180
181                 warn "# queue logouts";
182                 foreach my $ip ( keys %$seen ) {
183                         $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
184                 }
185
186                 warn "# wait";
187                 $taskset->wait;
188
189                 $redis->set( "$variant.poll.finish" => $self->datetime_now );
190                 warn "# seen ", dump($seen);
191
192                 my $ips = scalar keys %$seen;
193                 return "polled $ips IPs"; # body end
194                 }
195         } ],
196 });
197
198 $driver->run;
199