create private Gearman::Client to make taskset work
[APKPM.git] / persistant_worker.pl
1 #!/usr/bin/perl
2
3 package Cog; # persistant Gearman Worker - see the pun?
4
5 use Moose;
6 use JSON::XS qw();
7 extends 'Gearman::Driver::Worker::Base';
8
9 use lib 'lib';
10 with 'APKPM::Gearman';
11 with 'APKPM::Gearman::Client';
12
13 sub prefix { '' }
14
15 package main;
16
17 use Gearman::Driver;
18 use Data::Dump qw(dump);
19
20 use Regexp::Common qw(net);
21 use Redis;
22
23 use lib 'lib';
24 use H1::ZTEDSLAM;
25
26 my $driver = Gearman::Driver->new(
27         server   => 'localhost:4730',
28         interval => 60,
29         loglevel   => 'DEBUG',
30         logfile    => 'log/persistant.log',
31 ) || die $!;
32
33 my $w1 = 'Cog';
34 my $worker = $w1->new();
35
36
37 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
38
39
40 my $poll;
41
42 foreach my $user ( @$crm ) {
43         next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
44         $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
45 }
46
47 warn "# poll = ", dump($poll);
48
49 foreach my $ip ( keys %$poll ) {
50
51         my $method = "ZTEDSLAM_$ip";
52
53         my $zte = H1::ZTEDSLAM->new( ip => $ip );
54
55         $driver->add_job({
56                 max_processes => 1, # FIXME increase?
57                 min_processes => 1,
58                 name            => $method,
59                 worker          => $worker,
60                 methods    => [
61                         {
62                                 decode => 'd_json',
63 #                               encode => 'e_json',
64                                 name   => $method,
65                                 body   => sub {
66
67                 my ( $self, $job, $crm ) = @_;
68
69                 if ( exists $crm->{logout} ) {
70                         $zte->logout;
71                         return;
72                 }
73
74                 my $hash;
75                 eval { $hash = $zte->hash( $crm->{SHELF_SLOT_PORT} ) };
76
77                 my $redis = Redis->new;
78
79                 if ( $@ ) {
80                         $redis->sadd("ZTEDSLAM.error" => $@);
81                         return { error => $@ };
82                 } else {
83                         $redis->sadd("ZTEDSLAM.ok" => $crm->{USERNAME});
84                 }
85
86                 $self->do_background_json( 'Store_insert', {
87                         _table => 'dslam_h',
88                         ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
89                         username => $crm->{USERNAME},
90                         timestamp => $self->datetime_now,
91                         h => $self->to_hstore($hash),
92                 });
93
94 #               return $hash; # body end
95
96                                 },
97                         },
98                 ]
99         });
100
101         warn "$method added\n";
102
103 }
104
105 $driver->add_job({
106         max_processes => 1,
107         min_processes => 1,
108         worker          => $worker,
109         name            => 'poll_ZTEDSLAM',
110         methods    => [ {
111                 name            => 'poll_ZTEDSLAM',
112                 body   => sub {
113
114                 my ( $self, $job, $workload ) = @_;
115
116                 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
117
118                 warn "# crm $crm";
119
120                 my $gc = Gearman::Client->new;
121                 $gc->job_servers( $self->server || 'localhost:4730' );
122                 my $taskset = $gc->new_task_set;
123                 my $seen;
124
125                 my $redis = Redis->new;
126                 $redis->del( $_ ) foreach $redis->keys("ZTEDSLAM.*");
127
128                 foreach my $user ( @$crm ) {
129                         my $ip   = $user->{IP_UREDAJA};
130                         my $port = $user->{SHELF_SLOT_PORT};
131
132                         if ( $ip !~ /$RE{net}{IPv4}/ ) {
133                                 $redis->sadd("ZTEDSLAM.invalid.IP_UREDAJA" => $ip);
134                                 next;
135                         } elsif ( $port !~ m{\d+(/\d+)+} ) {
136                                 $redis->sadd("ZTEDSLAM.invalid.PORT" => $port);
137                                 next;
138                         }
139                         $redis->sadd("ZTEDSLAM.queued" => "$ip $port");
140
141                         my $name = 'ZTEDSLAM_' . $ip;
142                         $taskset->add_task( $name, $self->e_json( $user ), {
143                                 on_complete => sub { $redis->sadd("ZTEDSLAM.complete", "$ip $port") },
144                                 on_fail     => sub { $redis->sadd("ZTEDSLAM.fail", "$ip $port") },
145                         } )
146                         if ! $seen->{ $ip }->{ $port }++;
147                 }
148
149                 warn "# queue logouts";
150                 foreach my $ip ( keys %$seen ) {
151                         $taskset->add_task( "ZTEDSLAM_$ip", $self->e_json( { logout => 1 } ) );
152                 }
153
154                 warn "# wait";
155                 $taskset->wait;
156
157                 warn "# seen ", dump($seen);
158
159                 return; # body end
160                 }
161         } ],
162 });
163
164 $driver->run;
165