specify ZTEDSLAM (default) or ZTEMSAN in command line
[APKPM.git] / persistant_worker.pl
1 #!/usr/bin/perl
2
3 package Cog; # persistant Gearman Worker - see the pun?
4
5 use Moose;
6 use JSON::XS qw();
7 extends 'Gearman::Driver::Worker::Base';
8
9 use lib 'lib';
10 with 'APKPM::Gearman';
11 with 'APKPM::Gearman::Client';
12
13 sub prefix { '' }
14
15 package main;
16
17 use Gearman::Driver;
18 use Data::Dump qw(dump);
19
20 use Regexp::Common qw(net);
21 use Redis;
22
23 use lib 'lib';
24 use H1::ZTEDSLAM;
25 use H1::ZTEMSAN;
26
27 my $variant = $ARGV[0] || 'ZTEDSLAM';
28 warn "variant: $variant\n";
29
30 my $driver = Gearman::Driver->new(
31         server   => 'localhost:4730',
32         interval => 60,
33         loglevel   => 'DEBUG',
34         logfile    => 'log/persistant.log',
35 ) || die $!;
36
37 my $w1 = 'Cog';
38 my $worker = $w1->new();
39
40
41 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
42
43
44 my $poll;
45
46 foreach my $user ( @$crm ) {
47         next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
48         $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
49 }
50
51 warn "# poll = ", dump($poll);
52
53 foreach my $ip ( keys %$poll ) {
54
55         my $method = $variant . '_' . $ip;
56
57         my $module = 'H1::' . $variant;
58         my $zte = $module->new( ip => $ip );
59
60         $driver->add_job({
61                 max_processes => 1, # FIXME increase?
62                 min_processes => 1,
63                 name            => $method,
64                 worker          => $worker,
65                 methods    => [
66                         {
67                                 decode => 'd_json',
68 #                               encode => 'e_json',
69                                 name   => $method,
70                                 body   => sub {
71
72                 my ( $self, $job, $crm ) = @_;
73
74                 if ( exists $crm->{logout} ) {
75                         $zte->logout;
76                         return;
77                 }
78
79                 my $hash;
80                 eval { $hash = $zte->hash( $crm->{SHELF_SLOT_PORT} ) };
81
82                 my $redis = Redis->new;
83
84                 if ( $@ ) {
85                         $redis->sadd("$variant.error" => $@);
86                         return { error => $@ };
87                 } else {
88                         $redis->sadd("$variant.ok" => $crm->{USERNAME});
89                 }
90
91                 $self->do_background_json( 'Store_insert', {
92                         _table => 'dslam_h',
93                         ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
94                         username => $crm->{USERNAME},
95                         timestamp => $self->datetime_now,
96                         h => $self->to_hstore($hash),
97                 });
98
99 #               return $hash; # body end
100
101                                 },
102                         },
103                 ]
104         });
105
106         warn "$method added\n";
107
108 }
109
110 $driver->add_job({
111         max_processes => 1,
112         min_processes => 1,
113         worker          => $worker,
114         name            => "poll_$variant",
115         methods    => [ {
116                 name            => "poll_$variant",
117                 body   => sub {
118
119                 my ( $self, $job, $workload ) = @_;
120
121                 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
122
123                 warn "# crm $crm";
124
125                 my $gc = Gearman::Client->new;
126                 $gc->job_servers( $self->server || 'localhost:4730' );
127                 my $taskset = $gc->new_task_set;
128                 my $seen;
129
130                 my $redis = Redis->new;
131                 $redis->del( $_ ) foreach $redis->keys("$variant.*");
132
133                 $redis->set( "$variant.start" => $self->datetime_now );
134
135                 foreach my $user ( @$crm ) {
136                         my $ip   = $user->{IP_UREDAJA};
137                         my $port = $user->{SHELF_SLOT_PORT};
138
139                         if ( $ip !~ /$RE{net}{IPv4}/ ) {
140                                 $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
141                                 next;
142                         } elsif ( $port !~ m{\d+(/\d+)+} ) {
143                                 $redis->sadd("$variant.invalid.PORT" => $port);
144                                 next;
145                         } elsif ( $seen->{ $ip }->{ $port }++ ) {
146                                 $redis->sadd("$variant.invalid.duplicate" => "$ip $port");
147                                 next;
148                         }
149                         $redis->sadd("$variant.queued" => "$ip $port");
150                         $redis->incr("$variant.ip.$ip");
151
152                         my $name = $variant . '_' . $ip;
153                         $taskset->add_task( $name, $self->e_json( $user ), {
154                                 on_complete => sub { $redis->sadd("$variant.complete", "$ip $port") },
155                                 on_fail     => sub { $redis->sadd("$variant.fail", "$ip $port") },
156                         } )
157                 }
158
159                 warn "# queue logouts";
160                 foreach my $ip ( keys %$seen ) {
161                         $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
162                 }
163
164                 warn "# wait";
165                 $taskset->wait;
166
167                 $redis->set( "$variant.finish" => $self->datetime_now );
168                 warn "# seen ", dump($seen);
169
170                 my $ips = scalar keys %$seen;
171                 return "polled $ips IPs"; # body end
172                 }
173         } ],
174 });
175
176 $driver->run;
177