X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=persistant_worker.pl;h=42dabe86e62ae2a7ee3087ea76029ac29382105e;hb=b7ad83dc3edd53f4023f67440774c636e120cec2;hp=e8a666d3ec151eb6b0e58d619511dd46ba1ff740;hpb=ac1f7a47fd3da39f3539688940eedcc0ace04623;p=APKPM.git diff --git a/persistant_worker.pl b/persistant_worker.pl index e8a666d..42dabe8 100755 --- a/persistant_worker.pl +++ b/persistant_worker.pl @@ -1,5 +1,9 @@ #!/usr/bin/perl +BEGIN { + $SIG{'__WARN__'} = sub { warn $_[0] if $_[0] =~ m/^(#+)/ && length($1) <= $ENV{DEBUG} } +} + package Cog; # persistant Gearman Worker - see the pun? use Moose; @@ -23,13 +27,16 @@ use Redis; use lib 'lib'; use H1::ZTEDSLAM; use H1::ZTEMSAN; +use APKPM::Model; -my $variant = $ARGV[0] || 'ZTEDSLAM'; -warn "variant: $variant\n"; +my ($variant,$max_processes) = @ARGV; +$variant ||= 'ZTEDSLAM'; +$max_processes ||= 1; +warn "variant: $variant max_processes: $max_processes\n"; my $driver = Gearman::Driver->new( server => 'localhost:4730', - interval => 60, + interval => 15, loglevel => 'DEBUG', logfile => "log/$variant.log", ) || die $!; @@ -57,15 +64,19 @@ foreach my $ip ( keys %$poll ) { my $module = 'H1::' . $variant; my $zte = $module->new( ip => $ip ); + my $ports = keys %{ $poll->{$ip} }; + my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports + $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel + $driver->add_job({ - max_processes => 1, # FIXME increase? - min_processes => 1, + max_processes => $max_procs, + min_processes => 0, name => $method, worker => $worker, methods => [ { decode => 'd_json', -# encode => 'e_json', + encode => 'e_json', name => $method, body => sub { @@ -78,39 +89,35 @@ foreach my $ip ( keys %$poll ) { if ( exists $crm->{logout} ) { $zte->logout; $redis->sadd("$variant.$ip.logout", $self->datetime_now); - return { logout => 'ok' }; + return "logout ok"; # XXX return scalar! } my $port = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT"; my $username = $crm->{USERNAME} || die "no USERNAME"; - if ( ! $port ) { - $redis->sadd("$variant.invalid.SHELF_SLOT_PORT" => $username ); - return "invalid port $port"; - } - my $hash; eval { $hash = $zte->hash( $port ) }; if ( $@ ) { $redis->sadd("$variant.$ip.error" => $@); - return "error: $@"; + return { error => $@ }; } elsif ( ! $hash ) { $redis->sadd("$variant.$ip.empty" => $port); - return "empty"; + return { error => 'empty' }; } else { $redis->sadd("$variant.$ip.ok" => $port); } $self->do_background_json( 'Store_insert', { - _table => 'dslam_h', + _table => 'dslam', ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ? username => $username, timestamp => $self->datetime_now, - h => $self->to_hstore($hash), + variant => $variant, + h => $hash, }); - return "ok $username $ip $port"; # body end + return $hash; # body end # XXX -- worker body }, @@ -118,17 +125,18 @@ foreach my $ip ( keys %$poll ) { ] }); - warn "$method added\n"; + warn "$method for $ip with $ports ports using $max_procs processes\n"; } $driver->add_job({ max_processes => 1, - min_processes => 1, + min_processes => 0, worker => $worker, - name => "poll_$variant", + name => $variant . '_poll', methods => [ { - name => "poll_$variant", + name => $variant . '_poll', + encode => 'e_json', body => sub { my ( $self, $job, $workload ) = @_; @@ -145,23 +153,29 @@ $driver->add_job({ my $redis = Redis->new; $redis->del( $_ ) foreach $redis->keys("$variant.*"); - $redis->set( "$variant.time.start" => $self->datetime_now ); + $redis->set( "$variant.poll.start" => $self->datetime_now ); foreach my $user ( @$crm ) { - my $ip = $user->{IP_UREDAJA}; - my $port = $user->{SHELF_SLOT_PORT}; + # required CRM fields for worker + my $ip = $user->{IP_UREDAJA} || next; + my $port = $user->{SHELF_SLOT_PORT} || next; + my $username = $user->{USERNAME} || next; if ( $ip !~ /$RE{net}{IPv4}/ ) { $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip); next; } elsif ( $port !~ m{\d+(/\d+)+} ) { - $redis->sadd("$variant.$ip.invalid.PORT" => $port); + $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port); + next; + } elsif ( ! $username ) { + $redis->sadd("$variant.$ip.invalid.USERNAME" => $port); next; } elsif ( $seen->{ $ip }->{ $port }++ ) { - $redis->sadd("$variant.$ip.duplicate" => $port); + $redis->sadd("$variant.$ip.invalid.duplicate" => $port); next; } - $redis->incr("$variant.queued"); + + $redis->incr("$variant.poll.queued"); $redis->sadd("$variant.$ip.queued" => $port); my $name = $variant . '_' . $ip; @@ -179,14 +193,27 @@ $driver->add_job({ warn "# wait"; $taskset->wait; - $redis->set( "$variant.time.finish" => $self->datetime_now ); - warn "# seen ", dump($seen); + $redis->set( "$variant.poll.finish" => $self->datetime_now ); + + my $status = APKPM::Model->redis_status; + my $row = $status->{poll}->{$variant}; + $row->{$_} = $status->{"$variant.poll.$_"} foreach ( 'start', 'finish' ); + $row->{variant} = $variant; + $self->do_background_json('Store_insert', { _table => 'dslam_poll', %$row }); + warn "# dslam_poll ",dump($row); my $ips = scalar keys %$seen; - return "polled $ips IPs"; # body end + my $ports = 0; + $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen; + warn "# finish $ips ips with $ports ports\n"; + return { ips => $ips, ports => $ports, dslam_poll => $row } ; # body end } } ], }); +open(my $pid, '>', "/tmp/apkpm.$variant.pid"); +print $pid "$$\n"; +close $pid; + $driver->run;