modify schema for new CPE stats
[APKPM.git] / persistant_worker.pl
index 26ce184..29a0ece 100755 (executable)
@@ -1,5 +1,9 @@
 #!/usr/bin/perl
 
+BEGIN {
+       $SIG{'__WARN__'} = sub { warn $_[0] if $_[0] =~ m/^(#+)/ && length($1) <= $ENV{DEBUG} }
+}
+
 package Cog; # persistant Gearman Worker - see the pun?
 
 use Moose;
@@ -22,19 +26,26 @@ use Redis;
 
 use lib 'lib';
 use H1::ZTEDSLAM;
+use H1::ZTEMSAN;
+use APKPM::Model;
+
+my ($variant,$max_processes) = @ARGV;
+$variant ||= 'ZTEDSLAM';
+$max_processes ||= 1;
+warn "variant: $variant max_processes: $max_processes\n";
 
 my $driver = Gearman::Driver->new(
        server   => 'localhost:4730',
-       interval => 60,
+       interval => 15,
        loglevel   => 'DEBUG',
-       logfile    => 'log/persistant.log',
+       logfile    => "log/$variant.log",
 ) || die $!;
 
 my $w1 = 'Cog';
 my $worker = $w1->new();
 
 
-my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
+my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
 
 
 my $poll;
@@ -48,13 +59,18 @@ warn "# poll = ", dump($poll);
 
 foreach my $ip ( keys %$poll ) {
 
-       my $method = "ZTEDSLAM_$ip";
+       my $method = $variant . '_' . $ip;
+
+       my $module = 'H1::' . $variant;
+       my $zte = $module->new( ip => $ip );
 
-       my $zte = H1::ZTEDSLAM->new( ip => $ip );
+       my $ports = keys %{ $poll->{$ip} };
+       my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports
+       $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel
 
        $driver->add_job({
-               max_processes => 1, # FIXME increase?
-               min_processes => 1,
+               max_processes => $max_procs,
+               min_processes => 0,
                name            => $method,
                worker          => $worker,
                methods    => [
@@ -64,56 +80,67 @@ foreach my $ip ( keys %$poll ) {
                                name   => $method,
                                body   => sub {
 
+# XXX -- worker body
+
                my ( $self, $job, $crm ) = @_;
 
+               my $redis = Redis->new;
+
                if ( exists $crm->{logout} ) {
                        $zte->logout;
-                       return;
+                       $redis->sadd("$variant.$ip.logout", $self->datetime_now);
+                       return "logout ok"; # XXX return scalar!
                }
 
-               my $hash;
-               eval { $hash = $zte->hash( $crm->{SHELF_SLOT_PORT} ) };
+               my $port     = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT";
+               my $username = $crm->{USERNAME}        || die "no USERNAME";
 
-               my $redis = Redis->new;
+               my $hash;
+               eval { $hash = $zte->hash( $port ) };
 
                if ( $@ ) {
-                       $redis->sadd("ZTEDSLAM.error" => $@);
-                       return { error => $@ };
+                       $redis->sadd("$variant.$ip.error" => $@);
+                       return "error: $@";
+               } elsif ( ! $hash ) {
+                       $redis->sadd("$variant.$ip.empty" => $port);
+                       return "empty";
                } else {
-                       $redis->sadd("ZTEDSLAM.ok" => $crm->{USERNAME});
+                       $redis->sadd("$variant.$ip.ok" => $port);
                }
 
                $self->do_background_json( 'Store_insert', {
                        _table => 'dslam_h',
                        ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
-                       username => $crm->{USERNAME},
+                       username => $username,
                        timestamp => $self->datetime_now,
                        h => $self->to_hstore($hash),
                });
 
-#              return $hash; # body end
+               return "ok $username $ip $port"; # body end
 
+# XXX -- worker body
                                },
                        },
                ]
        });
 
-       warn "$method added\n";
+       warn "$method for $ip with $ports ports using $max_procs processes\n";
 
 }
 
 $driver->add_job({
        max_processes => 1,
-       min_processes => 1,
+       min_processes => 0,
        worker          => $worker,
-       name            => 'poll_ZTEDSLAM',
+       name            => "poll_$variant",
        methods    => [ {
-               name            => 'poll_ZTEDSLAM',
+               name            => "poll_$variant",
+               encode => 'e_json',
                body   => sub {
 
                my ( $self, $job, $workload ) = @_;
 
-               my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:ZTEDSLAM" );
+               my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
 
                warn "# crm $crm";
 
@@ -123,43 +150,69 @@ $driver->add_job({
                my $seen;
 
                my $redis = Redis->new;
-               $redis->del( $_ ) foreach $redis->keys("ZTEDSLAM.*");
+               $redis->del( $_ ) foreach $redis->keys("$variant.*");
+
+               $redis->set( "$variant.poll.start" => $self->datetime_now );
 
                foreach my $user ( @$crm ) {
-                       my $ip   = $user->{IP_UREDAJA};
-                       my $port = $user->{SHELF_SLOT_PORT};
+                       # required CRM fields for worker
+                       my $ip   = $user->{IP_UREDAJA}      || next;
+                       my $port = $user->{SHELF_SLOT_PORT} || next;
+                       my $username = $user->{USERNAME}    || next;
 
                        if ( $ip !~ /$RE{net}{IPv4}/ ) {
-                               $redis->sadd("ZTEDSLAM.invalid.IP_UREDAJA" => $ip);
+                               $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
                                next;
                        } elsif ( $port !~ m{\d+(/\d+)+} ) {
-                               $redis->sadd("ZTEDSLAM.invalid.PORT" => $port);
+                               $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port);
+                               next;
+                       } elsif ( ! $username ) {
+                               $redis->sadd("$variant.$ip.invalid.USERNAME" => $port);
+                               next;
+                       } elsif ( $seen->{ $ip }->{ $port }++ ) {
+                               $redis->sadd("$variant.$ip.invalid.duplicate" => $port);
                                next;
                        }
-                       $redis->sadd("ZTEDSLAM.queued" => "$ip $port");
 
-                       my $name = 'ZTEDSLAM_' . $ip;
+                       $redis->incr("$variant.poll.queued");
+                       $redis->sadd("$variant.$ip.queued" => $port);
+
+                       my $name = $variant . '_' . $ip;
                        $taskset->add_task( $name, $self->e_json( $user ), {
-                               on_complete => sub { $redis->sadd("ZTEDSLAM.complete", "$ip $port") },
-                               on_fail     => sub { $redis->sadd("ZTEDSLAM.fail", "$ip $port") },
+                               on_complete => sub { $redis->sadd("$variant.$ip.complete" => $port) },
+                               on_fail     => sub { $redis->sadd("$variant.$ip.failed" => $port) },
                        } )
-                       if ! $seen->{ $ip }->{ $port }++;
                }
 
                warn "# queue logouts";
                foreach my $ip ( keys %$seen ) {
-                       $taskset->add_task( "ZTEDSLAM_$ip", $self->e_json( { logout => 1 } ) );
+                       $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
                }
 
                warn "# wait";
                $taskset->wait;
 
-               warn "# seen ", dump($seen);
+               $redis->set( "$variant.poll.finish" => $self->datetime_now );
 
-               return; # body end
+               my $status = APKPM::Model->redis_status;
+               my $row = $status->{poll}->{$variant};
+               $row->{$_} = $status->{"$variant.poll.$_"} foreach ( 'start', 'finish' );
+               $row->{variant} = $variant;
+               $self->do_background_json('Store_insert', { _table => 'zte_poll', %$row });
+               warn "# zte_poll ",dump($row);
+
+               my $ips = scalar keys %$seen;
+               my $ports = 0;
+               $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen;
+               warn "# finish $ips ips with $ports ports\n";
+               return { ips => $ips, ports => $ports, zte_poll => $row }  ; # body end
                }
        } ],
 });
 
+open(my $pid, '>', "/tmp/apkpm.$variant.pid");
+print $pid "$$\n";
+close $pid;
+
 $driver->run;