4 $SIG{'__WARN__'} = sub { warn $_[0] if $_[0] =~ m/^(#+)/ && length($1) <= $ENV{DEBUG} }
7 package Cog; # persistant Gearman Worker - see the pun?
11 extends 'Gearman::Driver::Worker::Base';
14 with 'APKPM::Gearman';
15 with 'APKPM::Gearman::Client';
22 use Data::Dump qw(dump);
24 use Regexp::Common qw(net);
32 my ($variant,$max_processes) = @ARGV;
33 $variant ||= 'ZTEDSLAM';
35 warn "variant: $variant max_processes: $max_processes\n";
37 my $driver = Gearman::Driver->new(
38 server => 'localhost:4730',
41 logfile => "log/$variant.log",
45 my $worker = $w1->new();
47 $0 = "CRM_search TIP_UREDJAJA:$variant";
49 my $crm = $worker->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
55 foreach my $user ( @$crm ) {
56 next unless $user->{IP_UREDAJA} =~ /$RE{net}{IPv4}/;
57 $poll->{ $user->{IP_UREDAJA} }->{ $user->{SHELF_SLOT_PORT} }++;
60 warn "# poll = ", dump($poll);
62 foreach my $ip ( keys %$poll ) {
64 my $method = $variant . '_' . $ip;
66 my $module = 'H1::' . $variant;
67 my $zte = $module->new( ip => $ip );
69 my $ports = keys %{ $poll->{$ip} };
70 my $max_procs = ( int($ports/50) ) + 1; # one process for 50 ports
71 $max_procs = $max_processes if $max_processes < $max_procs; # hard-limit for maximum ports in parallel
74 max_processes => $max_procs,
87 my ( $self, $job, $crm ) = @_;
89 my $redis = Redis->new;
91 if ( exists $crm->{logout} ) {
93 $redis->sadd("$variant.$ip.logout", $self->datetime_now);
94 return "logout ok"; # XXX return scalar!
97 my $port = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT";
98 my $username = $crm->{USERNAME} || die "no USERNAME";
100 $0 = "$method $port $username"; # process name
103 eval { $hash = $zte->hash( $port ) };
106 $redis->sadd("$variant.$ip.error" => $@);
107 return { error => $@ };
108 } elsif ( ! $hash ) {
109 $redis->sadd("$variant.$ip.empty" => $port);
110 return { error => 'empty' };
112 $redis->sadd("$variant.$ip.ok" => $port);
115 $self->do_background_json( 'Store_insert', {
117 ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
118 username => $username,
119 timestamp => $self->datetime_now,
125 return $hash; # body end
133 warn "$method for $ip with $ports ports using $max_procs processes\n";
141 name => $variant . '_poll',
143 name => $variant . '_poll',
147 my ( $self, $job, $workload ) = @_;
149 $0 = $variant . '_poll CRM_search';
151 my $crm = $self->do( 'CRM_search' => "TIP_UREDJAJA:$variant" );
155 my $gc = Gearman::Client->new;
156 $gc->job_servers( $self->server || 'localhost:4730' );
157 my $taskset = $gc->new_task_set;
160 my $redis = Redis->new;
161 $redis->del( $_ ) foreach $redis->keys("$variant.*");
163 $0 = $variant . '_poll start';
164 $redis->set( "$variant.poll.start" => $self->datetime_now );
166 foreach my $user ( @$crm ) {
167 # required CRM fields for worker
168 my $ip = $user->{IP_UREDAJA} || next;
169 my $port = $user->{SHELF_SLOT_PORT} || next;
170 my $username = $user->{USERNAME} || next;
172 if ( $ip !~ /$RE{net}{IPv4}/ ) {
173 $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
175 } elsif ( $port !~ m{\d+(/\d+)+} ) {
176 $redis->sadd("$variant.$ip.invalid.SHELF_SLOT_PORT" => $port);
178 } elsif ( ! $username ) {
179 $redis->sadd("$variant.$ip.invalid.USERNAME" => $port);
181 } elsif ( $seen->{ $ip }->{ $port }++ ) {
182 $redis->sadd("$variant.$ip.invalid.duplicate" => $port);
186 $redis->incr("$variant.poll.queued");
187 $redis->sadd("$variant.$ip.queued" => $port);
189 my $name = $variant . '_' . $ip;
190 $taskset->add_task( $name, $self->e_json( $user ), {
191 on_complete => sub { $redis->sadd("$variant.$ip.complete" => $port) },
192 on_fail => sub { $redis->sadd("$variant.$ip.failed" => $port) },
196 warn "# queue logouts";
198 foreach my $ip ( keys %$seen ) {
199 $taskset->add_task( $variant . '_' . $ip, $self->e_json( { logout => 1 } ) );
200 $0 = $variant . '_poll ' . $n++;
203 $0 = $variant . "_poll wait";
207 $redis->set( "$variant.poll.finish" => $self->datetime_now );
209 my $status = APKPM::Model->redis_status;
210 my $row = $status->{poll}->{$variant};
211 $row->{$_} = $status->{"$variant.poll.$_"} foreach ( 'start', 'finish' );
212 $row->{variant} = $variant;
213 $self->do_background_json('Store_insert', { _table => 'dslam_poll', %$row });
214 warn "# dslam_poll ",dump($row);
216 my $ips = scalar keys %$seen;
218 $ports += scalar keys %{ $seen->{$_} } foreach keys %$seen;
219 warn "# finish $ips ips with $ports ports\n";
220 $0 = $variant . "_poll last ips:$ips ports:$ports";
221 return { ips => $ips, ports => $ports, dslam_poll => $row } ; # body end
226 open(my $pid, '>', "/tmp/apkpm.$variant.pid");