X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=persistant_worker.pl;h=29a0ece2f23e3833ec3b59f8a81a57d57357c720;hb=4e897268ea351ae8a2e285ae62203a561b3f41ba;hp=cefe70f309a6268f3fed2347f76494ca44354ba3;hpb=2cf1da5cd8f1c87ddccb741a1143046d70417bdb;p=APKPM.git diff --git a/persistant_worker.pl b/persistant_worker.pl index cefe70f..29a0ece 100755 --- a/persistant_worker.pl +++ b/persistant_worker.pl @@ -1,64 +1,218 @@ #!/usr/bin/perl +BEGIN { + $SIG{'__WARN__'} = sub { warn $_[0] if $_[0] =~ m/^(#+)/ && length($1) <= $ENV{DEBUG} } +} + package Cog; # persistant Gearman Worker - see the pun? use Moose; use JSON::XS qw(); extends 'Gearman::Driver::Worker::Base'; -sub prefix { '' } +use lib 'lib'; +with 'APKPM::Gearman'; +with 'APKPM::Gearman::Client'; -sub encode_json { - my ( $self, $result ) = @_; - return JSON::XS::encode_json($result); -} - -sub decode_json { - my ( $self, $workload ) = @_; - return JSON::XS::decode_json($workload); -} +sub prefix { '' } package main; use Gearman::Driver; +use Data::Dump qw(dump); + +use Regexp::Common qw(net); +use Redis; + +use lib 'lib'; +use H1::ZTEDSLAM; +use H1::ZTEMSAN; +use APKPM::Model; + +my ($variant,$max_processes) = @ARGV; +$variant ||= 'ZTEDSLAM'; +$max_processes ||= 1; +warn "variant: $variant max_processes: $max_processes\n"; my $driver = Gearman::Driver->new( server => 'localhost:4730', - interval => 60, + interval => 15, loglevel => 'DEBUG', - logfile => 'log/persistant.log', + logfile => "log/$variant.log", ) || die $!; my $w1 = 'Cog'; my $worker = $w1->new(); -$worker->meta->add_method( 'scale_image' => sub { - my ( $self, $job, $workload ) = @_; - warn "# scale_image $job $workload"; - # do something - return { scale_image => $workload }; -}); +my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" ); -# run each method in an own process -foreach my $method (qw(scale_image )) { - $driver->add_job( - { - max_processes => 5, - min_processes => 1, - name => $method, - worker => $worker, - methods => [ - { - body => $w1->meta->find_method_by_name($method)->body, -# decode => 'decode_json', - encode => 'encode_json', - name => $method, - }, - ] + +my $poll; + +foreach my $user ( @$crm ) { + next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/; + $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++; +} + +warn "# poll = ", dump($poll); + +foreach my $ip ( keys %$poll ) { + + my $method = $variant . '_' . $ip; + + my $module = 'H1::' . $variant; + my $zte = $module->new( ip => $ip ); + + my $ports = keys %{ $poll->{$ip} }; + my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports + $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel + + $driver->add_job({ + max_processes => $max_procs, + min_processes => 0, + name => $method, + worker => $worker, + methods => [ + { + decode => 'd_json', +# encode => 'e_json', + name => $method, + body => sub { + +# XXX -- worker body + + my ( $self, $job, $crm ) = @_; + + my $redis = Redis->new; + + if ( exists $crm->{logout} ) { + $zte->logout; + $redis->sadd("$variant.$ip.logout", $self->datetime_now); + return "logout ok"; # XXX return scalar! + } + + my $port = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT"; + my $username = $crm->{USERNAME} || die "no USERNAME"; + + my $hash; + eval { $hash = $zte->hash( $port ) }; + + if ( $@ ) { + $redis->sadd("$variant.$ip.error" => $@); + return "error: $@"; + } elsif ( ! $hash ) { + $redis->sadd("$variant.$ip.empty" => $port); + return "empty"; + } else { + $redis->sadd("$variant.$ip.ok" => $port); } - ); + + $self->do_background_json( 'Store_insert', { + _table => 'dslam_h', + ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ? + username => $username, + timestamp => $self->datetime_now, + h => $self->to_hstore($hash), + }); + + return "ok $username $ip $port"; # body end + +# XXX -- worker body + }, + }, + ] + }); + + warn "$method for $ip with $ports ports using $max_procs processes\n"; + } +$driver->add_job({ + max_processes => 1, + min_processes => 0, + worker => $worker, + name => "poll_$variant", + methods => [ { + name => "poll_$variant", + encode => 'e_json', + body => sub { + + my ( $self, $job, $workload ) = @_; + + my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" ); + + warn "# crm $crm"; + + my $gc = Gearman::Client->new; + $gc->job_servers( $self->server || 'localhost:4730' ); + my $taskset = $gc->new_task_set; + my $seen; + + my $redis = Redis->new; + $redis->del( $_ ) foreach $redis->keys("$variant.*"); + + $redis->set( "$variant.poll.start" => $self->datetime_now ); + + foreach my $user ( @$crm ) { + # required CRM fields for worker + my $ip = $user->{IP_UREDAJA} || next; + my $port = $user->{SHELF_SLOT_PORT} || next; + my $username = $user->{USERNAME} || next; + + if ( $ip !~ /$RE{net}{IPv4}/ ) { + $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip); + next; + } elsif ( $port !~ m{\d+(/\d+)+} ) { + $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port); + next; + } elsif ( ! $username ) { + $redis->sadd("$variant.$ip.invalid.USERNAME" => $port); + next; + } elsif ( $seen->{ $ip }->{ $port }++ ) { + $redis->sadd("$variant.$ip.invalid.duplicate" => $port); + next; + } + + $redis->incr("$variant.poll.queued"); + $redis->sadd("$variant.$ip.queued" => $port); + + my $name = $variant . '_' . $ip; + $taskset->add_task( $name, $self->e_json( $user ), { + on_complete => sub { $redis->sadd("$variant.$ip.complete" => $port) }, + on_fail => sub { $redis->sadd("$variant.$ip.failed" => $port) }, + } ) + } + + warn "# queue logouts"; + foreach my $ip ( keys %$seen ) { + $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) ); + } + + warn "# wait"; + $taskset->wait; + + $redis->set( "$variant.poll.finish" => $self->datetime_now ); + + my $status = APKPM::Model->redis_status; + my $row = $status->{poll}->{$variant}; + $row->{$_} = $status->{"$variant.poll.$_"} foreach ( 'start', 'finish' ); + $row->{variant} = $variant; + $self->do_background_json('Store_insert', { _table => 'zte_poll', %$row }); + warn "# zte_poll ",dump($row); + + my $ips = scalar keys %$seen; + my $ports = 0; + $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen; + warn "# finish $ips ips with $ports ports\n"; + return { ips => $ips, ports => $ports, zte_poll => $row } ; # body end + } + } ], +}); + +open(my $pid, '>', "/tmp/apkpm.$variant.pid"); +print $pid "$$\n"; +close $pid; + $driver->run;