91ec64f6812e18c310dd613abfff62e3c3ff24e1
[APKPM.git] / persistant_worker.pl
1 #!/usr/bin/perl
2
3 BEGIN {
4         $SIG{'__WARN__'} = sub { warn $_[0] if $_[0] =~ m/^(#+)/ && length($1) <= $ENV{DEBUG} }
5 }
6
7 package Cog; # persistant Gearman Worker - see the pun?
8
9 use Moose;
10 use JSON::XS qw();
11 extends 'Gearman::Driver::Worker::Base';
12
13 use lib 'lib';
14 with 'APKPM::Gearman';
15 with 'APKPM::Gearman::Client';
16
17 sub prefix { '' }
18
19 package main;
20
21 use Gearman::Driver;
22 use Data::Dump qw(dump);
23
24 use Regexp::Common qw(net);
25 use Redis;
26
27 use lib 'lib';
28 use H1::ZTEDSLAM;
29 use H1::ZTEMSAN;
30
31 my ($variant,$max_processes) = @ARGV;
32 $variant ||= 'ZTEDSLAM';
33 $max_processes ||= 1;
34 warn "variant: $variant max_processes: $max_processes\n";
35
36 my $driver = Gearman::Driver->new(
37         server   => 'localhost:4730',
38         interval => 15,
39         loglevel   => 'DEBUG',
40         logfile    => "log/$variant.log",
41 ) || die $!;
42
43 my $w1 = 'Cog';
44 my $worker = $w1->new();
45
46
47 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
48
49
50 my $poll;
51
52 foreach my $user ( @$crm ) {
53         next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
54         $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
55 }
56
57 warn "# poll = ", dump($poll);
58
59 foreach my $ip ( keys %$poll ) {
60
61         my $method = $variant . '_' . $ip;
62
63         my $module = 'H1::' . $variant;
64         my $zte = $module->new( ip => $ip );
65
66         my $ports = keys %{ $poll->{$ip} };
67         my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports
68         $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel
69
70         $driver->add_job({
71                 max_processes => $max_procs,
72                 min_processes => 0,
73                 name            => $method,
74                 worker          => $worker,
75                 methods    => [
76                         {
77                                 decode => 'd_json',
78 #                               encode => 'e_json',
79                                 name   => $method,
80                                 body   => sub {
81
82 # XXX -- worker body
83
84                 my ( $self, $job, $crm ) = @_;
85
86                 my $redis = Redis->new;
87
88                 if ( exists $crm->{logout} ) {
89                         $zte->logout;
90                         $redis->sadd("$variant.$ip.logout", $self->datetime_now);
91                         return "logout ok"; # XXX return scalar!
92                 }
93
94                 my $port     = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT";
95                 my $username = $crm->{USERNAME}        || die "no USERNAME";
96
97                 my $hash;
98                 eval { $hash = $zte->hash( $port ) };
99
100                 if ( $@ ) {
101                         $redis->sadd("$variant.$ip.error" => $@);
102                         return "error: $@";
103                 } elsif ( ! $hash ) {
104                         $redis->sadd("$variant.$ip.empty" => $port);
105                         return "empty";
106                 } else {
107                         $redis->sadd("$variant.$ip.ok" => $port);
108                 }
109
110                 $self->do_background_json( 'Store_insert', {
111                         _table => 'dslam_h',
112                         ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
113                         username => $username,
114                         timestamp => $self->datetime_now,
115                         h => $self->to_hstore($hash),
116                 });
117
118                 return "ok $username $ip $port"; # body end
119
120 # XXX -- worker body
121                                 },
122                         },
123                 ]
124         });
125
126         warn "$method for $ip with $ports ports using $max_procs processes\n";
127
128 }
129
130 $driver->add_job({
131         max_processes => 1,
132         min_processes => 0,
133         worker          => $worker,
134         name            => "poll_$variant",
135         methods    => [ {
136                 name            => "poll_$variant",
137                 encode => 'e_json',
138                 body   => sub {
139
140                 my ( $self, $job, $workload ) = @_;
141
142                 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
143
144                 warn "# crm $crm";
145
146                 my $gc = Gearman::Client->new;
147                 $gc->job_servers( $self->server || 'localhost:4730' );
148                 my $taskset = $gc->new_task_set;
149                 my $seen;
150
151                 my $redis = Redis->new;
152                 $redis->del( $_ ) foreach $redis->keys("$variant.*");
153
154                 $redis->set( "$variant.poll.start" => $self->datetime_now );
155
156                 foreach my $user ( @$crm ) {
157                         # required CRM fields for worker
158                         my $ip   = $user->{IP_UREDAJA}      || next;
159                         my $port = $user->{SHELF_SLOT_PORT} || next;
160                         my $username = $user->{USERNAME}    || next;
161
162                         if ( $ip !~ /$RE{net}{IPv4}/ ) {
163                                 $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
164                                 next;
165                         } elsif ( $port !~ m{\d+(/\d+)+} ) {
166                                 $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port);
167                                 next;
168                         } elsif ( ! $username ) {
169                                 $redis->sadd("$variant.$ip.invalid.USERNAME" => $port);
170                                 next;
171                         } elsif ( $seen->{ $ip }->{ $port }++ ) {
172                                 $redis->sadd("$variant.$ip.invalid.duplicate" => $port);
173                                 next;
174                         }
175
176                         $redis->incr("$variant.poll.queued");
177                         $redis->sadd("$variant.$ip.queued" => $port);
178
179                         my $name = $variant . '_' . $ip;
180                         $taskset->add_task( $name, $self->e_json( $user ), {
181                                 on_complete => sub { $redis->sadd("$variant.$ip.complete" => $port) },
182                                 on_fail     => sub { $redis->sadd("$variant.$ip.failed" => $port) },
183                         } )
184                 }
185
186                 warn "# queue logouts";
187                 foreach my $ip ( keys %$seen ) {
188                         $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
189                 }
190
191                 warn "# wait";
192                 $taskset->wait;
193
194                 $redis->set( "$variant.poll.finish" => $self->datetime_now );
195
196                 my $ips = scalar keys %$seen;
197                 my $ports = 0;
198                 $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen;
199                 warn "# finish $ips ips with $ports ports\n";
200                 return { ips => $ips, ports => $ports }  ; # body end
201                 }
202         } ],
203 });
204
205 $driver->run;
206