Merge branch 'devel' of h1dev:/srv/APKPM/
authorDobrica Pavlinusic <dpavlin@rot13.org>
Wed, 23 Mar 2011 22:42:51 +0000 (22:42 +0000)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Wed, 23 Mar 2011 22:42:51 +0000 (22:42 +0000)
gearman_driver.pl
lib/APKPM/Davolink.pm
lib/APKPM/Gearman.pm
lib/APKPM/Poll.pm
lib/APKPM/Store.pm
lib/APKPM/VOICE.pm
t/Store.t

index 2292288..268519b 100755 (executable)
@@ -25,6 +25,9 @@ my $driver = Gearman::Driver->new(
                'APKPM::Davolink::info' => {
                        max_processes => 60,
                },
+               'APKPM::Store::insert' => {
+                       max_processes => 10,
+               },
        },
 );
 $driver->run;
index 7fe7a14..b988acb 100644 (file)
@@ -32,7 +32,13 @@ sub info : Job : Decode(d_array) : Encode(e_json) {
                        $ret->{UPTIME} = $cpeconnect->custom("cat /proc/uptime");
                } elsif ( $param =~ m/adsl/i ) {
                        $ret->{ADSL} = { $cpeconnect->adsl_info };
-                       $self->do_background_json( 'Store_ADSL', $ret );
+                       $self->do_background_json( 'Store_insert', {
+                               _table => 'adsl',
+                               ip => $cpeip,
+                               username => $username,
+                               timestamp => $self->datetime_now,
+                               %{$ret->{ADSL}}
+                       });
                        my $redis = Redis->new;
                        $redis->sadd( 'poll.adsl.ok' => $cpeip );
                } elsif ( $param =~ m/wan/i ) {
index 725da10..8a6ef80 100644 (file)
@@ -5,6 +5,9 @@ use Moose::Role;
 use Data::Dump qw(dump);
 use JSON::XS;
 
+use DateTime;
+use DateTime::Format::Pg;
+
 sub e_json {
        my ( $self, $result ) = @_;
        warn "# e_json ", dump $result;
@@ -24,4 +27,8 @@ sub d_array {
 
 }
 
+sub datetime_now {
+       return DateTime::Format::Pg->format_datetime( DateTime->now );
+}
+
 1;
index d9fb4fd..20372e3 100644 (file)
@@ -5,8 +5,6 @@ use Moose;
 use Time::HiRes qw(time);
 use Data::Dump qw(dump);
 use Redis;
-use DateTime;
-use DateTime::Format::Pg;
 
 with 'APKPM::Gearman::Client';
 with 'APKPM::Gearman';
@@ -16,7 +14,12 @@ sub prefix { 'poll_' }
 sub by_prefix : Job : Encode(e_json) {
        my ( $self, $job, $workload ) = @_;
 
-       my $start = DateTime->now();
+       my $redis = Redis->new;
+       $redis->del( $_ ) foreach $redis->keys('poll.*');
+
+       my $start = $self->datetime_now;
+
+       $redis->set( 'poll.start' => $start );
 
        my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
 
@@ -25,9 +28,6 @@ sub by_prefix : Job : Encode(e_json) {
                $ip_username->{ $entry->{cn} } = $1 if $entry->{dhcpStatements} =~ m/fixed-address\s+(\S+)/;
        }
 
-       my $redis = Redis->new;
-       $redis->del( $_ ) foreach $redis->keys('poll.*');
-
        my $taskset = $self->gc->new_task_set;
        my $results;
        while (my ($username,$ip) = each %$ip_username) {
@@ -40,24 +40,21 @@ sub by_prefix : Job : Encode(e_json) {
        warn "# wait";
        $taskset->wait;
 
-       my $finish = DateTime->now();
+       my $finish = $self->datetime_now;
+       $redis->set( 'poll.finish' => $finish );
 
        my $poll;
        foreach my $k ( $redis->keys('poll.*') ) {
                my $n = $k;
                $n =~ s/^poll\.//;
                $n =~ s/\./_/g;
-               $poll->{$n} = $redis->scard($k);
+               $poll->{$n} = eval { $redis->scard($k) } || $redis->get($k);
        }
-       $poll->{start}  = DateTime::Format::Pg->format_datetime($start);
-       $poll->{finish} = DateTime::Format::Pg->format_datetime($finish);
-
-       $poll->{_table} = 'poll';
 
        warn dump $poll;
-       $self->do_background_json('Store_insert_table', $poll);
+       $self->do_background_json('Store_insert', { _table => 'poll', %$poll });
 
-       return { ldap => $ip_username, results => $results, duration => $finish - $start, poll => $poll };
+       return { ldap => $ip_username, results => $results, poll => $poll };
 }
 
 sub ip_username : Job : Decode(d_array) : Encode(e_json) {
@@ -79,7 +76,13 @@ warn "XXX error: $ip";
        }
 
        $ping->{username} = $username;
-       $self->do_background_json( 'Store_ping', $ping );
+       $ping->{timestamp} = $self->datetime_now;
+       $self->do_background_json( 'Store_insert', {
+                _table => 'ping',
+               username => $username,
+               timestamp => $self->datetime_now,
+               %$ping
+       });
 
        $self->do_background( 'Davolink_info', "$ip $username adsl" );
 
index c617b5e..d63f481 100644 (file)
@@ -35,31 +35,7 @@ sub pg_insert {
        $sth->execute( map { $h->{$_} } @c );
 }
 
-sub ADSL : Job : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
-
-       my $h = $workload->{ADSL} || die "no ADSL in ",dump $workload;
-       foreach my $c ( qw(ip username) ) {
-               $h->{$c} = $workload->{$c} || die "no $c in ",dump $workload;
-       }
-
-       warn "# ADSL ", dump $h;
-
-       $self->pg_insert(adsl => $h)
-}
-
-sub ping : Job : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
-
-       $self->pg_insert(ping => $workload);
-}
-
-sub voice : Job : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
-       $self->pg_insert(voice => $workload);
-}
-
-sub insert_table : Job : Decode(d_json) {
+sub insert : Job : Decode(d_json) {
        my ( $self, $job, $workload ) = @_;
        my $table = delete $workload->{_table} || die "no _table";
        $self->pg_insert($table => $workload);
index a59953c..b17ecdd 100644 (file)
@@ -35,7 +35,13 @@ sub info : Job : Encode(e_json) {
        $voice->connectSAU( $sau->[ $sru{serverid} ] );
         my %sau = $voice->searchSAU($sru{alias});
 
-       $self->do_background_json( 'Store_voice', { broj => $broj, state => $sau{state}, serverid => $sru{serverid} });
+       $self->do_background_json( 'Store_insert', {
+               _table => 'voice',
+               timestamp => $self->datetime_now,
+               broj => $broj,
+               state => $sau{state},
+               serverid => $sru{serverid},
+       });
 
        return { broj => $workload, sru => \%sru, sau => \%sau };
 }
index 2bfa48c..1370ba2 100755 (executable)
--- a/t/Store.t
+++ b/t/Store.t
@@ -11,25 +11,25 @@ use_ok 'APKPM::Store';
 
 ok my $o = APKPM::Store->new, 'new';
 
-ok my $r = $o->ADSL('job',{
+ok my $r = $o->insert('job',{
+_table => 'adsl',
 ip => '127.0.0.1',
 username => 'nobody',
-ADSL => {
-       ATTNRX => "36.5",
-       ATTNTX => "17.8",
-       MAXRX  => 13500,
-       MAXTX  => 880,
-       PWRRX  => "0.0",
-       PWRTX  => "12.6",
-       RX     => 8500,
-       SNRRX  => "11.4",
-       SNRTX  => "16.0",
-       TX     => 798,
-}
+ATTNRX => "36.5",
+ATTNTX => "17.8",
+MAXRX  => 13500,
+MAXTX  => 880,
+PWRRX  => "0.0",
+PWRTX  => "12.6",
+RX     => 8500,
+SNRRX  => "11.4",
+SNRTX  => "16.0",
+TX     => 798,
 }), 'ADSL';
 diag dump($r);
 
-ok my $r = $o->ping('job',{
+ok my $r = $o->insert('job',{
+_table => 'ping',
 ip => '127.0.0.1',
 username => 'nobody',
 rtt => 0.042,