reorg stats collected in redis in variant.ip.description sets
authorDobrica Pavlinusic <dpavlin@rot13.org>
Wed, 25 May 2011 21:14:27 +0000 (23:14 +0200)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Wed, 25 May 2011 21:14:27 +0000 (23:14 +0200)
persistant_worker.pl

index 1478f4a..e8a666d 100755 (executable)
@@ -31,7 +31,7 @@ my $driver = Gearman::Driver->new(
        server   => 'localhost:4730',
        interval => 60,
        loglevel   => 'DEBUG',
-       logfile    => 'log/persistant.log',
+       logfile    => "log/$variant.log",
 ) || die $!;
 
 my $w1 = 'Cog';
@@ -69,35 +69,50 @@ foreach my $ip ( keys %$poll ) {
                                name   => $method,
                                body   => sub {
 
+# XXX -- worker body
+
                my ( $self, $job, $crm ) = @_;
 
+               my $redis = Redis->new;
+
                if ( exists $crm->{logout} ) {
                        $zte->logout;
-                       return;
+                       $redis->sadd("$variant.$ip.logout", $self->datetime_now);
+                       return { logout => 'ok' };
                }
 
-               my $hash;
-               eval { $hash = $zte->hash( $crm->{SHELF_SLOT_PORT} ) };
+               my $port     = $crm->{SHELF_SLOT_PORT} || die "no SHELF_SLOT_PORT";
+               my $username = $crm->{USERNAME}        || die "no USERNAME";
 
-               my $redis = Redis->new;
+               if ( ! $port ) {
+                       $redis->sadd("$variant.invalid.SHELF_SLOT_PORT" => $username );
+                       return "invalid port $port";
+               }
+
+               my $hash;
+               eval { $hash = $zte->hash( $port ) };
 
                if ( $@ ) {
-                       $redis->sadd("$variant.error" => $@);
-                       return { error => $@ };
+                       $redis->sadd("$variant.$ip.error" => $@);
+                       return "error: $@";
+               } elsif ( ! $hash ) {
+                       $redis->sadd("$variant.$ip.empty" => $port);
+                       return "empty";
                } else {
-                       $redis->sadd("$variant.ok" => $crm->{USERNAME});
+                       $redis->sadd("$variant.$ip.ok" => $port);
                }
 
                $self->do_background_json( 'Store_insert', {
                        _table => 'dslam_h',
                        ip => $crm->{IP_MANAGEMENT}, # FIXME IP_UREDAJA ?
-                       username => $crm->{USERNAME},
+                       username => $username,
                        timestamp => $self->datetime_now,
                        h => $self->to_hstore($hash),
                });
 
-#              return $hash; # body end
+               return "ok $username $ip $port"; # body end
 
+# XXX -- worker body
                                },
                        },
                ]
@@ -130,7 +145,7 @@ $driver->add_job({
                my $redis = Redis->new;
                $redis->del( $_ ) foreach $redis->keys("$variant.*");
 
-               $redis->set( "$variant.start" => $self->datetime_now );
+               $redis->set( "$variant.time.start" => $self->datetime_now );
 
                foreach my $user ( @$crm ) {
                        my $ip   = $user->{IP_UREDAJA};
@@ -140,19 +155,19 @@ $driver->add_job({
                                $redis->sadd("$variant.invalid.IP_UREDAJA" => $ip);
                                next;
                        } elsif ( $port !~ m{\d+(/\d+)+} ) {
-                               $redis->sadd("$variant.invalid.PORT" => $port);
+                               $redis->sadd("$variant.$ip.invalid.PORT" => $port);
                                next;
                        } elsif ( $seen->{ $ip }->{ $port }++ ) {
-                               $redis->sadd("$variant.invalid.duplicate" => "$ip $port");
+                               $redis->sadd("$variant.$ip.duplicate" => $port);
                                next;
                        }
-                       $redis->sadd("$variant.queued" => "$ip $port");
-                       $redis->incr("$variant.ip.$ip");
+                       $redis->incr("$variant.queued");
+                       $redis->sadd("$variant.$ip.queued" => $port);
 
                        my $name = $variant . '_' . $ip;
                        $taskset->add_task( $name, $self->e_json( $user ), {
-                               on_complete => sub { $redis->sadd("$variant.complete", "$ip $port") },
-                               on_fail     => sub { $redis->sadd("$variant.fail", "$ip $port") },
+                               on_complete => sub { $redis->sadd("$variant.$ip.complete" => $port) },
+                               on_fail     => sub { $redis->sadd("$variant.$ip.failed" => $port) },
                        } )
                }
 
@@ -164,7 +179,7 @@ $driver->add_job({
                warn "# wait";
                $taskset->wait;
 
-               $redis->set( "$variant.finish" => $self->datetime_now );
+               $redis->set( "$variant.time.finish" => $self->datetime_now );
                warn "# seen ", dump($seen);
 
                my $ips = scalar keys %$seen;