working ZTEDSLAM poller with persistant workers for each ip
[APKPM.git] / persistant_worker.pl
1 #!/usr/bin/perl
2
3 package Cog; # persistant Gearman Worker - see the pun?
4
5 use Moose;
6 use JSON::XS qw();
7 extends 'Gearman::Driver::Worker::Base';
8
9 use lib 'lib';
10 with 'APKPM::Gearman';
11 with 'APKPM::Gearman::Client';
12
13 sub prefix { '' }
14
15 package main;
16
17 use Gearman::Driver;
18 use Data::Dump qw(dump);
19
20 use lib 'lib';
21 use H1::ZTEDSLAM;
22
23 my $driver = Gearman::Driver->new(
24         server   => 'localhost:4730',
25         interval => 60,
26         loglevel   => 'DEBUG',
27         logfile    => 'log/persistant.log',
28 ) || die $!;
29
30 my $w1 = 'Cog';
31 my $worker = $w1->new();
32
33 warn "CRM_search";
34 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
35
36 my $poll;
37
38 foreach my $user ( @$crm ) {
39         $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
40 }
41
42 warn "# poll = ", dump($poll);
43
44 foreach my $ip ( keys %$poll ) {
45
46         my $method = "ZTEDSLAM_$ip";
47
48         $driver->add_job({
49                 max_processes => 1, # FIXME increase?
50                 min_processes => 1,
51                 name            => $method,
52                 worker          => $worker,
53                 methods    => [
54                         {
55                                 decode => 'd_json',
56                                 encode => 'e_json',
57                                 name   => $method,
58                                 body   => sub {
59
60                 my ( $self, $job, $crm ) = @_;
61                 my $hash;
62                 eval { $hash = H1::ZTEDSLAM->hash( $ip, $crm->{SHELF_SLOT_PORT} ) };
63
64                 return { error => $@ } if $@;
65
66                 $self->do_background_json( 'Store_insert', {
67                         _table => 'dslam_h',
68                         ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
69                         username => $crm->{USERNAME},
70                         timestamp => $self->datetime_now,
71                         h => $self->to_hstore($hash),
72                 });
73
74 #               return $hash; # body end
75
76                                 },
77                         },
78                 ]
79         });
80
81         warn "$method added\n";
82
83 }
84
85 $driver->add_job({
86         max_processes => 1,
87         min_processes => 1,
88         worker          => $worker,
89         methods    => [ {
90                 name            => 'poll_ZTEDSLAM',
91                 body   => sub {
92
93                 my ( $self, $job, $workload ) = @_;
94
95                 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
96
97                 warn "# crm $crm";
98
99                 my $taskset = $worker->gc->new_task_set;
100                 my $seen;
101
102                 foreach my $user ( @$crm ) {
103                         my $name = 'ZTEDSLAM_' . $user->{IP_UREDAJA};
104                         $taskset->add_task( $name, $self->e_json( $user ), {
105                                 on_complete => sub { "# $name done ", dump($user), $/ }
106                         } )
107                         if ! $seen->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
108                 }
109                 warn "# wait";
110                 $taskset->wait;
111
112                 warn "# seen ", dump($seen);
113
114                 return; # body end
115                 }
116         } ],
117 });
118
119 $driver->run;
120