specify ZTEDSLAM (default) or ZTEMSAN in command line
[APKPM.git] / persistant_worker.pl
index cefe70f..1478f4a 100755 (executable)
@@ -6,21 +6,26 @@ use Moose;
 use JSON::XS qw();
 extends 'Gearman::Driver::Worker::Base';
 
-sub prefix { '' }
-
-sub encode_json {
-       my ( $self, $result ) = @_;
-       return JSON::XS::encode_json($result);
-}
+use lib 'lib';
+with 'APKPM::Gearman';
+with 'APKPM::Gearman::Client';
 
-sub decode_json {
-       my ( $self, $workload ) = @_;
-       return JSON::XS::decode_json($workload);
-}
+sub prefix { '' }
 
 package main;
 
 use Gearman::Driver;
+use Data::Dump qw(dump);
+
+use Regexp::Common qw(net);
+use Redis;
+
+use lib 'lib';
+use H1::ZTEDSLAM;
+use H1::ZTEMSAN;
+
+my $variant = $ARGV[0] || 'ZTEDSLAM';
+warn "variant: $variant\n";
 
 my $driver = Gearman::Driver->new(
        server   => 'localhost:4730',
@@ -32,33 +37,141 @@ my $driver = Gearman::Driver->new(
 my $w1 = 'Cog';
 my $worker = $w1->new();
 
-$worker->meta->add_method( 'scale_image' => sub {
-       my ( $self, $job, $workload ) = @_;
-       warn "# scale_image $job $workload";
-       # do something
-       return { scale_image => $workload };
-});
 
+my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
 
-# run each method in an own process
-foreach my $method (qw(scale_image )) {
-       $driver->add_job(
-               {
-                       max_processes => 5,
-                       min_processes => 1,
-                       name            => $method,
-                       worker          => $worker,
-                       methods    => [
-                               {
-                                       body   => $w1->meta->find_method_by_name($method)->body,
-#                                      decode => 'decode_json',
-                                       encode => 'encode_json',
-                                       name   => $method,
-                               },
-                       ]
+
+my $poll;
+
+foreach my $user ( @$crm ) {
+       next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
+       $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
+}
+
+warn "# poll = ", dump($poll);
+
+foreach my $ip ( keys %$poll ) {
+
+       my $method = $variant . '_' . $ip;
+
+       my $module = 'H1::' . $variant;
+       my $zte = $module->new( ip => $ip );
+
+       $driver->add_job({
+               max_processes => 1, # FIXME increase?
+               min_processes => 1,
+               name            => $method,
+               worker          => $worker,
+               methods    => [
+                       {
+                               decode => 'd_json',
+#                              encode => 'e_json',
+                               name   => $method,
+                               body   => sub {
+
+               my ( $self, $job, $crm ) = @_;
+
+               if ( exists $crm->{logout} ) {
+                       $zte->logout;
+                       return;
+               }
+
+               my $hash;
+               eval { $hash = $zte->hash( $crm->{SHELF_SLOT_PORT} ) };
+
+               my $redis = Redis->new;
+
+               if ( $@ ) {
+                       $redis->sadd("$variant.error" => $@);
+                       return { error => $@ };
+               } else {
+                       $redis->sadd("$variant.ok" => $crm->{USERNAME});
                }
-       );
+
+               $self->do_background_json( 'Store_insert', {
+                       _table => 'dslam_h',
+                       ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
+                       username => $crm->{USERNAME},
+                       timestamp => $self->datetime_now,
+                       h => $self->to_hstore($hash),
+               });
+
+#              return $hash; # body end
+
+                               },
+                       },
+               ]
+       });
+
+       warn "$method added\n";
+
 }
 
+$driver->add_job({
+       max_processes => 1,
+       min_processes => 1,
+       worker          => $worker,
+       name            => "poll_$variant",
+       methods    => [ {
+               name            => "poll_$variant",
+               body   => sub {
+
+               my ( $self, $job, $workload ) = @_;
+
+               my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
+
+               warn "# crm $crm";
+
+               my $gc = Gearman::Client->new;
+               $gc->job_servers( $self->server || 'localhost:4730' );
+               my $taskset = $gc->new_task_set;
+               my $seen;
+
+               my $redis = Redis->new;
+               $redis->del( $_ ) foreach $redis->keys("$variant.*");
+
+               $redis->set( "$variant.start" => $self->datetime_now );
+
+               foreach my $user ( @$crm ) {
+                       my $ip   = $user->{IP_UREDAJA};
+                       my $port = $user->{SHELF_SLOT_PORT};
+
+                       if ( $ip !~ /$RE{net}{IPv4}/ ) {
+                               $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
+                               next;
+                       } elsif ( $port !~ m{\d+(/\d+)+} ) {
+                               $redis->sadd("$variant.invalid.PORT" => $port);
+                               next;
+                       } elsif ( $seen->{ $ip }->{ $port }++ ) {
+                               $redis->sadd("$variant.invalid.duplicate" => "$ip $port");
+                               next;
+                       }
+                       $redis->sadd("$variant.queued" => "$ip $port");
+                       $redis->incr("$variant.ip.$ip");
+
+                       my $name = $variant . '_' . $ip;
+                       $taskset->add_task( $name, $self->e_json( $user ), {
+                               on_complete => sub { $redis->sadd("$variant.complete", "$ip $port") },
+                               on_fail     => sub { $redis->sadd("$variant.fail", "$ip $port") },
+                       } )
+               }
+
+               warn "# queue logouts";
+               foreach my $ip ( keys %$seen ) {
+                       $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
+               }
+
+               warn "# wait";
+               $taskset->wait;
+
+               $redis->set( "$variant.finish" => $self->datetime_now );
+               warn "# seen ", dump($seen);
+
+               my $ips = scalar keys %$seen;
+               return "polled $ips IPs"; # body end
+               }
+       } ],
+});
+
 $driver->run;