e2db7d8781ec724bc4e5a3792121620a6c0ae665
[APKPM.git] / persistant_worker.pl
1 #!/usr/bin/perl
2
3 package Cog; # persistant Gearman Worker - see the pun?
4
5 use Moose;
6 use JSON::XS qw();
7 extends 'Gearman::Driver::Worker::Base';
8
9 use lib 'lib';
10 with 'APKPM::Gearman';
11 with 'APKPM::Gearman::Client';
12
13 sub prefix { '' }
14
15 package main;
16
17 use Gearman::Driver;
18 use Data::Dump qw(dump);
19
20 use Regexp::Common qw(net);
21
22 use lib 'lib';
23 use H1::ZTEDSLAM;
24
25 my $driver = Gearman::Driver->new(
26         server   => 'localhost:4730',
27         interval => 60,
28         loglevel   => 'DEBUG',
29         logfile    => 'log/persistant.log',
30 ) || die $!;
31
32 my $w1 = 'Cog';
33 my $worker = $w1->new();
34
35 warn "CRM_search";
36 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
37
38 my $poll;
39
40 foreach my $user ( @$crm ) {
41         next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
42         $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
43 }
44
45 warn "# poll = ", dump($poll);
46
47 foreach my $ip ( keys %$poll ) {
48
49         my $method = "ZTEDSLAM_$ip";
50
51         my $zte = H1::ZTEDSLAM->new( ip => $ip );
52
53         $driver->add_job({
54                 max_processes => 1, # FIXME increase?
55                 min_processes => 1,
56                 name            => $method,
57                 worker          => $worker,
58                 methods    => [
59                         {
60                                 decode => 'd_json',
61                                 encode => 'e_json',
62                                 name   => $method,
63                                 body   => sub {
64
65                 my ( $self, $job, $crm ) = @_;
66                 my $hash;
67                 eval { $hash = $zte->hash( $crm->{SHELF_SLOT_PORT} ) };
68
69                 return { error => $@ } if $@;
70
71                 $self->do_background_json( 'Store_insert', {
72                         _table => 'dslam_h',
73                         ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
74                         username => $crm->{USERNAME},
75                         timestamp => $self->datetime_now,
76                         h => $self->to_hstore($hash),
77                 });
78
79 #               return $hash; # body end
80
81                                 },
82                         },
83                 ]
84         });
85
86         warn "$method added\n";
87
88 }
89
90 $driver->add_job({
91         max_processes => 1,
92         min_processes => 1,
93         worker          => $worker,
94         methods    => [ {
95                 name            => 'poll_ZTEDSLAM',
96                 body   => sub {
97
98                 my ( $self, $job, $workload ) = @_;
99
100                 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
101
102                 warn "# crm $crm";
103
104                 my $taskset = $worker->gc->new_task_set;
105                 my $seen;
106
107                 foreach my $user ( @$crm ) {
108                         my $ip = $user->{IP_UREDAJA};
109                         next unless $ip =~ /$RE{net}{IPv4}/;
110                         my $name = 'ZTEDSLAM_' . $ip;
111                         $taskset->add_task( $name, $self->e_json( $user ), {
112                                 on_complete => sub { "# $name done", $/ }
113                         } )
114                         if ! $seen->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
115                 }
116                 warn "# wait";
117                 $taskset->wait;
118
119                 warn "# seen ", dump($seen);
120
121                 return; # body end
122                 }
123         } ],
124 });
125
126 $driver->run;
127