modify schema for new CPE stats
[APKPM.git] / persistant_worker.pl
index 82339cd..29a0ece 100755 (executable)
@@ -1,5 +1,9 @@
 #!/usr/bin/perl
 
+BEGIN {
+       $SIG{'__WARN__'} = sub { warn $_[0] if $_[0] =~ m/^(#+)/ && length($1) <= $ENV{DEBUG} }
+}
+
 package Cog; # persistant Gearman Worker - see the pun?
 
 use Moose;
@@ -23,6 +27,7 @@ use Redis;
 use lib 'lib';
 use H1::ZTEDSLAM;
 use H1::ZTEMSAN;
+use APKPM::Model;
 
 my ($variant,$max_processes) = @ARGV;
 $variant ||= 'ZTEDSLAM';
@@ -31,7 +36,7 @@ warn "variant: $variant max_processes: $max_processes\n";
 
 my $driver = Gearman::Driver->new(
        server   => 'localhost:4730',
-       interval => 60,
+       interval => 15,
        loglevel   => 'DEBUG',
        logfile    => "log/$variant.log",
 ) || die $!;
@@ -59,9 +64,13 @@ foreach my $ip ( keys %$poll ) {
        my $module = 'H1::' . $variant;
        my $zte = $module->new( ip => $ip );
 
+       my $ports = keys %{ $poll->{$ip} };
+       my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports
+       $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel
+
        $driver->add_job({
-               max_processes => $max_processes,
-               min_processes => 1,
+               max_processes => $max_procs,
+               min_processes => 0,
                name            => $method,
                worker          => $worker,
                methods    => [
@@ -115,17 +124,18 @@ foreach my $ip ( keys %$poll ) {
                ]
        });
 
-       warn "$method added\n";
+       warn "$method for $ip with $ports ports using $max_procs processes\n";
 
 }
 
 $driver->add_job({
        max_processes => 1,
-       min_processes => 1,
+       min_processes => 0,
        worker          => $worker,
        name            => "poll_$variant",
        methods    => [ {
                name            => "poll_$variant",
+               encode => 'e_json',
                body   => sub {
 
                my ( $self, $job, $workload ) = @_;
@@ -183,13 +193,26 @@ $driver->add_job({
                $taskset->wait;
 
                $redis->set( "$variant.poll.finish" => $self->datetime_now );
-               warn "# seen ", dump($seen);
+
+               my $status = APKPM::Model->redis_status;
+               my $row = $status->{poll}->{$variant};
+               $row->{$_} = $status->{"$variant.poll.$_"} foreach ( 'start', 'finish' );
+               $row->{variant} = $variant;
+               $self->do_background_json('Store_insert', { _table => 'zte_poll', %$row });
+               warn "# zte_poll ",dump($row);
 
                my $ips = scalar keys %$seen;
-               return "polled $ips IPs"; # body end
+               my $ports = 0;
+               $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen;
+               warn "# finish $ips ips with $ports ports\n";
+               return { ips => $ips, ports => $ports, zte_poll => $row }  ; # body end
                }
        } ],
 });
 
+open(my $pid, '>', "/tmp/apkpm.$variant.pid");
+print $pid "$$\n";
+close $pid;
+
 $driver->run;