#!/usr/bin/perl
+BEGIN {
+ $SIG{'__WARN__'} = sub { warn $_[0] if $_[0] =~ m/^(#+)/ && length($1) <= $ENV{DEBUG} }
+}
+
package Cog; # persistant Gearman Worker - see the pun?
use Moose;
use lib 'lib';
use H1::ZTEDSLAM;
use H1::ZTEMSAN;
+use APKPM::Model;
-my $variant = $ARGV[0] || 'ZTEDSLAM';
-warn "variant: $variant\n";
+my ($variant,$max_processes) = @ARGV;
+$variant ||= 'ZTEDSLAM';
+$max_processes ||= 1;
+warn "variant: $variant max_processes: $max_processes\n";
my $driver = Gearman::Driver->new(
server => 'localhost:4730',
- interval => 60,
+ interval => 3,
loglevel => 'DEBUG',
logfile => "log/$variant.log",
) || die $!;
my $module = 'H1::' . $variant;
my $zte = $module->new( ip => $ip );
+ my $ports = keys %{ $poll->{$ip} };
+ my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports
+ $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel
+
$driver->add_job({
- max_processes => 1, # FIXME increase?
+ max_processes => $max_procs,
min_processes => 1,
name => $method,
worker => $worker,
methods => [
{
decode => 'd_json',
-# encode => 'e_json',
+ encode => 'e_json',
name => $method,
body => sub {
if ( exists $crm->{logout} ) {
$zte->logout;
$redis->sadd("$variant.$ip.logout", $self->datetime_now);
- return { logout => 'ok' };
+ return "logout ok"; # XXX return scalar!
}
my $port = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT";
my $username = $crm->{USERNAME} || die "no USERNAME";
- if ( ! $port ) {
- $redis->sadd("$variant.invalid.SHELF_SLOT_PORT" => $username );
- return "invalid port $port";
- }
-
my $hash;
eval { $hash = $zte->hash( $port ) };
if ( $@ ) {
$redis->sadd("$variant.$ip.error" => $@);
- return "error: $@";
+ return { error => $@ };
} elsif ( ! $hash ) {
$redis->sadd("$variant.$ip.empty" => $port);
- return "empty";
+ return { error => 'empty' };
} else {
$redis->sadd("$variant.$ip.ok" => $port);
}
$self->do_background_json( 'Store_insert', {
- _table => 'dslam_h',
+ _table => 'dslam',
ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
username => $username,
timestamp => $self->datetime_now,
- h => $self->to_hstore($hash),
+ variant => $variant,
+ h => $hash,
});
- return "ok $username $ip $port"; # body end
+ return $hash; # body end
# XXX -- worker body
},
]
});
- warn "$method added\n";
+ warn "$method for $ip with $ports ports using $max_procs processes\n";
}
$driver->add_job({
max_processes => 1,
- min_processes => 1,
+ min_processes => 0,
worker => $worker,
- name => "poll_$variant",
+ name => $variant . '_poll',
methods => [ {
- name => "poll_$variant",
+ name => $variant . '_poll',
+ encode => 'e_json',
body => sub {
my ( $self, $job, $workload ) = @_;
my $redis = Redis->new;
$redis->del( $_ ) foreach $redis->keys("$variant.*");
- $redis->set( "$variant.time.start" => $self->datetime_now );
+ $redis->set( "$variant.poll.start" => $self->datetime_now );
foreach my $user ( @$crm ) {
- my $ip = $user->{IP_UREDAJA};
- my $port = $user->{SHELF_SLOT_PORT};
+ # required CRM fields for worker
+ my $ip = $user->{IP_UREDAJA} || next;
+ my $port = $user->{SHELF_SLOT_PORT} || next;
+ my $username = $user->{USERNAME} || next;
if ( $ip !~ /$RE{net}{IPv4}/ ) {
$redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
next;
} elsif ( $port !~ m{\d+(/\d+)+} ) {
- $redis->sadd("$variant.$ip.invalid.PORT" => $port);
+ $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port);
+ next;
+ } elsif ( ! $username ) {
+ $redis->sadd("$variant.$ip.invalid.USERNAME" => $port);
next;
} elsif ( $seen->{ $ip }->{ $port }++ ) {
- $redis->sadd("$variant.$ip.duplicate" => $port);
+ $redis->sadd("$variant.$ip.invalid.duplicate" => $port);
next;
}
- $redis->incr("$variant.queued");
+
+ $redis->incr("$variant.poll.queued");
$redis->sadd("$variant.$ip.queued" => $port);
my $name = $variant . '_' . $ip;
warn "# wait";
$taskset->wait;
- $redis->set( "$variant.time.finish" => $self->datetime_now );
- warn "# seen ", dump($seen);
+ $redis->set( "$variant.poll.finish" => $self->datetime_now );
+
+ my $status = APKPM::Model->redis_status;
+ my $row = $status->{poll}->{$variant};
+ $row->{$_} = $status->{"$variant.poll.$_"} foreach ( 'start', 'finish' );
+ $row->{variant} = $variant;
+ $self->do_background_json('Store_insert', { _table => 'dslam_poll', %$row });
+ warn "# dslam_poll ",dump($row);
my $ips = scalar keys %$seen;
- return "polled $ips IPs"; # body end
+ my $ports = 0;
+ $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen;
+ warn "# finish $ips ips with $ports ports\n";
+ return { ips => $ips, ports => $ports, dslam_poll => $row } ; # body end
}
} ],
});
+open(my $pid, '>', "/tmp/apkpm.$variant.pid");
+print $pid "$$\n";
+close $pid;
+
$driver->run;