use do_high for CRM_search
[APKPM.git] / persistant_worker.pl
index e8a666d..42dabe8 100755 (executable)
@@ -1,5 +1,9 @@
 #!/usr/bin/perl
 
+BEGIN {
+       $SIG{'__WARN__'} = sub { warn $_[0] if $_[0] =~ m/^(#+)/ && length($1) <= $ENV{DEBUG} }
+}
+
 package Cog; # persistant Gearman Worker - see the pun?
 
 use Moose;
@@ -23,13 +27,16 @@ use Redis;
 use lib 'lib';
 use H1::ZTEDSLAM;
 use H1::ZTEMSAN;
+use APKPM::Model;
 
-my $variant = $ARGV[0] || 'ZTEDSLAM';
-warn "variant: $variant\n";
+my ($variant,$max_processes) = @ARGV;
+$variant ||= 'ZTEDSLAM';
+$max_processes ||= 1;
+warn "variant: $variant max_processes: $max_processes\n";
 
 my $driver = Gearman::Driver->new(
        server   => 'localhost:4730',
-       interval => 60,
+       interval => 15,
        loglevel   => 'DEBUG',
        logfile    => "log/$variant.log",
 ) || die $!;
@@ -57,15 +64,19 @@ foreach my $ip ( keys %$poll ) {
        my $module = 'H1::' . $variant;
        my $zte = $module->new( ip => $ip );
 
+       my $ports = keys %{ $poll->{$ip} };
+       my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports
+       $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel
+
        $driver->add_job({
-               max_processes => 1, # FIXME increase?
-               min_processes => 1,
+               max_processes => $max_procs,
+               min_processes => 0,
                name            => $method,
                worker          => $worker,
                methods    => [
                        {
                                decode => 'd_json',
-#                              encode => 'e_json',
+                               encode => 'e_json',
                                name   => $method,
                                body   => sub {
 
@@ -78,39 +89,35 @@ foreach my $ip ( keys %$poll ) {
                if ( exists $crm->{logout} ) {
                        $zte->logout;
                        $redis->sadd("$variant.$ip.logout", $self->datetime_now);
-                       return { logout => 'ok' };
+                       return "logout ok"; # XXX return scalar!
                }
 
                my $port     = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT";
                my $username = $crm->{USERNAME}        || die "no USERNAME";
 
-               if ( ! $port ) {
-                       $redis->sadd("$variant.invalid.SHELF_SLOT_PORT" => $username );
-                       return "invalid port $port";
-               }
-
                my $hash;
                eval { $hash = $zte->hash( $port ) };
 
                if ( $@ ) {
                        $redis->sadd("$variant.$ip.error" => $@);
-                       return "error: $@";
+                       return { error => $@ };
                } elsif ( ! $hash ) {
                        $redis->sadd("$variant.$ip.empty" => $port);
-                       return "empty";
+                       return { error => 'empty' };
                } else {
                        $redis->sadd("$variant.$ip.ok" => $port);
                }
 
                $self->do_background_json( 'Store_insert', {
-                       _table => 'dslam_h',
+                       _table => 'dslam',
                        ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
                        username => $username,
                        timestamp => $self->datetime_now,
-                       h => $self->to_hstore($hash),
+                       variant => $variant,
+                       h => $hash,
                });
 
-               return "ok $username $ip $port"; # body end
+               return $hash; # body end
 
 # XXX -- worker body
                                },
@@ -118,17 +125,18 @@ foreach my $ip ( keys %$poll ) {
                ]
        });
 
-       warn "$method added\n";
+       warn "$method for $ip with $ports ports using $max_procs processes\n";
 
 }
 
 $driver->add_job({
        max_processes => 1,
-       min_processes => 1,
+       min_processes => 0,
        worker          => $worker,
-       name            => "poll_$variant",
+       name            => $variant . '_poll',
        methods    => [ {
-               name            => "poll_$variant",
+               name   => $variant . '_poll',
+               encode => 'e_json',
                body   => sub {
 
                my ( $self, $job, $workload ) = @_;
@@ -145,23 +153,29 @@ $driver->add_job({
                my $redis = Redis->new;
                $redis->del( $_ ) foreach $redis->keys("$variant.*");
 
-               $redis->set( "$variant.time.start" => $self->datetime_now );
+               $redis->set( "$variant.poll.start" => $self->datetime_now );
 
                foreach my $user ( @$crm ) {
-                       my $ip   = $user->{IP_UREDAJA};
-                       my $port = $user->{SHELF_SLOT_PORT};
+                       # required CRM fields for worker
+                       my $ip   = $user->{IP_UREDAJA}      || next;
+                       my $port = $user->{SHELF_SLOT_PORT} || next;
+                       my $username = $user->{USERNAME}    || next;
 
                        if ( $ip !~ /$RE{net}{IPv4}/ ) {
                                $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
                                next;
                        } elsif ( $port !~ m{\d+(/\d+)+} ) {
-                               $redis->sadd("$variant.$ip.invalid.PORT" => $port);
+                               $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port);
+                               next;
+                       } elsif ( ! $username ) {
+                               $redis->sadd("$variant.$ip.invalid.USERNAME" => $port);
                                next;
                        } elsif ( $seen->{ $ip }->{ $port }++ ) {
-                               $redis->sadd("$variant.$ip.duplicate" => $port);
+                               $redis->sadd("$variant.$ip.invalid.duplicate" => $port);
                                next;
                        }
-                       $redis->incr("$variant.queued");
+
+                       $redis->incr("$variant.poll.queued");
                        $redis->sadd("$variant.$ip.queued" => $port);
 
                        my $name = $variant . '_' . $ip;
@@ -179,14 +193,27 @@ $driver->add_job({
                warn "# wait";
                $taskset->wait;
 
-               $redis->set( "$variant.time.finish" => $self->datetime_now );
-               warn "# seen ", dump($seen);
+               $redis->set( "$variant.poll.finish" => $self->datetime_now );
+
+               my $status = APKPM::Model->redis_status;
+               my $row = $status->{poll}->{$variant};
+               $row->{$_} = $status->{"$variant.poll.$_"} foreach ( 'start', 'finish' );
+               $row->{variant} = $variant;
+               $self->do_background_json('Store_insert', { _table => 'dslam_poll', %$row });
+               warn "# dslam_poll ",dump($row);
 
                my $ips = scalar keys %$seen;
-               return "polled $ips IPs"; # body end
+               my $ports = 0;
+               $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen;
+               warn "# finish $ips ips with $ports ports\n";
+               return { ips => $ips, ports => $ports, dslam_poll => $row }  ; # body end
                }
        } ],
 });
 
+open(my $pid, '>', "/tmp/apkpm.$variant.pid");
+print $pid "$$\n";
+close $pid;
+
 $driver->run;