Merge branch 'devel' of h1dev:/srv/APKPM/
[APKPM.git] / persistant_worker.pl
1 #!/usr/bin/perl
2
3 BEGIN {
4         $SIG{'__WARN__'} = sub { warn $_[0] if $_[0] =~ m/^(#+)/ && length($1) <= $ENV{DEBUG} }
5 }
6
7 package Cog; # persistant Gearman Worker - see the pun?
8
9 use Moose;
10 use JSON::XS qw();
11 extends 'Gearman::Driver::Worker::Base';
12
13 use lib 'lib';
14 with 'APKPM::Gearman';
15 with 'APKPM::Gearman::Client';
16
17 sub prefix { '' }
18
19 package main;
20
21 use Gearman::Driver;
22 use Data::Dump qw(dump);
23
24 use Regexp::Common qw(net);
25 use Redis;
26
27 use lib 'lib';
28 use H1::ZTEDSLAM;
29 use H1::ZTEMSAN;
30 use APKPM::Model;
31
32 my ($variant,$max_processes) = @ARGV;
33 $variant ||= 'ZTEDSLAM';
34 $max_processes ||= 1;
35 warn "variant: $variant max_processes: $max_processes\n";
36
37 my $driver = Gearman::Driver->new(
38         server   => 'localhost:4730',
39         interval => 3,
40         loglevel   => 'DEBUG',
41         logfile    => "log/$variant.log",
42 ) || die $!;
43
44 my $w1 = 'Cog';
45 my $worker = $w1->new();
46
47 $0 = "CRM_search TIP_UREDJAJA:$variant";
48
49 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
50
51 $0 = $variant;
52
53 my $poll;
54
55 foreach my $user ( @$crm ) {
56         next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
57         $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
58 }
59
60 warn "# poll = ", dump($poll);
61
62 foreach my $ip ( keys %$poll ) {
63
64         my $method = $variant . '_' . $ip;
65
66         my $module = 'H1::' . $variant;
67         my $zte = $module->new( ip => $ip );
68
69         my $ports = keys %{ $poll->{$ip} };
70         my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports
71         $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel
72
73         $driver->add_job({
74                 max_processes => $max_procs,
75                 min_processes => 1,
76                 name            => $method,
77                 worker          => $worker,
78                 methods    => [
79                         {
80                                 decode => 'd_json',
81                                 encode => 'e_json',
82                                 name   => $method,
83                                 body   => sub {
84
85 # XXX -- worker body
86
87                 my ( $self, $job, $crm ) = @_;
88
89                 my $redis = Redis->new;
90
91                 if ( exists $crm->{logout} ) {
92                         $zte->logout;
93                         $redis->sadd("$variant.$ip.logout", $self->datetime_now);
94                         return "logout ok"; # XXX return scalar!
95                 }
96
97                 my $port     = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT";
98                 my $username = $crm->{USERNAME}        || die "no USERNAME";
99
100                 $0 = "$method $port $username"; # process name
101
102                 my $hash;
103                 eval { $hash = $zte->hash( $port ) };
104
105                 if ( $@ ) {
106                         $redis->sadd("$variant.$ip.error" => $@);
107                         return { error => $@ };
108                 } elsif ( ! $hash ) {
109                         $redis->sadd("$variant.$ip.empty" => $port);
110                         return { error => 'empty' };
111                 } else {
112                         $redis->sadd("$variant.$ip.ok" => $port);
113                 }
114
115                 $self->do_background_json( 'Store_insert', {
116                         _table => 'dslam',
117                         ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
118                         username => $username,
119                         timestamp => $self->datetime_now,
120                         variant => $variant,
121                         h => $hash,
122                 });
123
124                 $0 = "$method idle";
125                 return $hash; # body end
126
127 # XXX -- worker body
128                                 },
129                         },
130                 ]
131         });
132
133         warn "$method for $ip with $ports ports using $max_procs processes\n";
134
135 }
136
137 $driver->add_job({
138         max_processes => 1,
139         min_processes => 0,
140         worker          => $worker,
141         name            => $variant . '_poll',
142         methods    => [ {
143                 name   => $variant . '_poll',
144                 encode => 'e_json',
145                 body   => sub {
146
147                 my ( $self, $job, $workload ) = @_;
148
149                 $0 = $variant . '_poll CRM_search';
150
151                 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
152
153                 warn "# crm $crm";
154
155                 my $gc = Gearman::Client->new;
156                 $gc->job_servers( $self->server || 'localhost:4730' );
157                 my $taskset = $gc->new_task_set;
158                 my $seen;
159
160                 my $redis = Redis->new;
161                 $redis->del( $_ ) foreach $redis->keys("$variant.*");
162
163                 $0 = $variant . '_poll start';
164                 $redis->set( "$variant.poll.start" => $self->datetime_now );
165
166                 foreach my $user ( @$crm ) {
167                         # required CRM fields for worker
168                         my $ip   = $user->{IP_UREDAJA}      || next;
169                         my $port = $user->{SHELF_SLOT_PORT} || next;
170                         my $username = $user->{USERNAME}    || next;
171
172                         if ( $ip !~ /$RE{net}{IPv4}/ ) {
173                                 $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
174                                 next;
175                         } elsif ( $port !~ m{\d+(/\d+)+} ) {
176                                 $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port);
177                                 next;
178                         } elsif ( ! $username ) {
179                                 $redis->sadd("$variant.$ip.invalid.USERNAME" => $port);
180                                 next;
181                         } elsif ( $seen->{ $ip }->{ $port }++ ) {
182                                 $redis->sadd("$variant.$ip.invalid.duplicate" => $port);
183                                 next;
184                         }
185
186                         $redis->incr("$variant.poll.queued");
187                         $redis->sadd("$variant.$ip.queued" => $port);
188
189                         my $name = $variant . '_' . $ip;
190                         $taskset->add_task( $name, $self->e_json( $user ), {
191                                 on_complete => sub { $redis->sadd("$variant.$ip.complete" => $port) },
192                                 on_fail     => sub { $redis->sadd("$variant.$ip.failed" => $port) },
193                         } )
194                 }
195
196                 warn "# queue logouts";
197                 my $n = 1;
198                 foreach my $ip ( keys %$seen ) {
199                         $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
200                         $0 = $variant . '_poll ' . $n++;
201                 }
202
203                 $0 = $variant . "_poll wait";
204                 warn "# wait";
205                 $taskset->wait;
206
207                 $redis->set( "$variant.poll.finish" => $self->datetime_now );
208
209                 my $status = APKPM::Model->redis_status;
210                 my $row = $status->{poll}->{$variant};
211                 $row->{$_} = $status->{"$variant.poll.$_"} foreach ( 'start', 'finish' );
212                 $row->{variant} = $variant;
213                 $self->do_background_json('Store_insert', { _table => 'dslam_poll', %$row });
214                 warn "# dslam_poll ",dump($row);
215
216                 my $ips = scalar keys %$seen;
217                 my $ports = 0;
218                 $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen;
219                 warn "# finish $ips ips with $ports ports\n";
220                 $0 = $variant . "_poll last ips:$ips ports:$ports";
221                 return { ips => $ips, ports => $ports, dslam_poll => $row }  ; # body end
222                 }
223         } ],
224 });
225
226 open(my $pid, '>', "/tmp/apkpm.$variant.pid");
227 print $pid "$$\n";
228 close $pid;
229
230 $driver->run;
231