major storage refactor into single insert function
authorDobrica Pavlinusic <dpavlin@rot13.org>
Wed, 23 Mar 2011 22:38:44 +0000 (23:38 +0100)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Wed, 23 Mar 2011 22:38:44 +0000 (23:38 +0100)
This change also generates timestamps at correct places inside
perl code and store them in redis for inspection

This allows us to have correct event timestamps, regarding of queue for store
(which delayed now() trigger in database which we used before)

Additional benefit is that we don't have to finght different operating system
and database timezones.

gearman_driver.pl
lib/APKPM/Davolink.pm
lib/APKPM/Poll.pm
lib/APKPM/Store.pm
lib/APKPM/VOICE.pm
t/Store.t

index 2292288..268519b 100755 (executable)
@@ -25,6 +25,9 @@ my $driver = Gearman::Driver->new(
                'APKPM::Davolink::info' => {
                        max_processes => 60,
                },
+               'APKPM::Store::insert' => {
+                       max_processes => 10,
+               },
        },
 );
 $driver->run;
index 7fe7a14..b988acb 100644 (file)
@@ -32,7 +32,13 @@ sub info : Job : Decode(d_array) : Encode(e_json) {
                        $ret->{UPTIME} = $cpeconnect->custom("cat /proc/uptime");
                } elsif ( $param =~ m/adsl/i ) {
                        $ret->{ADSL} = { $cpeconnect->adsl_info };
-                       $self->do_background_json( 'Store_ADSL', $ret );
+                       $self->do_background_json( 'Store_insert', {
+                               _table => 'adsl',
+                               ip => $cpeip,
+                               username => $username,
+                               timestamp => $self->datetime_now,
+                               %{$ret->{ADSL}}
+                       });
                        my $redis = Redis->new;
                        $redis->sadd( 'poll.adsl.ok' => $cpeip );
                } elsif ( $param =~ m/wan/i ) {
index 5b1470b..20372e3 100644 (file)
@@ -43,8 +43,7 @@ sub by_prefix : Job : Encode(e_json) {
        my $finish = $self->datetime_now;
        $redis->set( 'poll.finish' => $finish );
 
-       my $poll = { _table => 'poll' }; # Store_insert_table arg
-
+       my $poll;
        foreach my $k ( $redis->keys('poll.*') ) {
                my $n = $k;
                $n =~ s/^poll\.//;
@@ -53,7 +52,7 @@ sub by_prefix : Job : Encode(e_json) {
        }
 
        warn dump $poll;
-       $self->do_background_json('Store_insert_table', $poll);
+       $self->do_background_json('Store_insert', { _table => 'poll', %$poll });
 
        return { ldap => $ip_username, results => $results, poll => $poll };
 }
@@ -77,7 +76,13 @@ warn "XXX error: $ip";
        }
 
        $ping->{username} = $username;
-       $self->do_background_json( 'Store_ping', $ping );
+       $ping->{timestamp} = $self->datetime_now;
+       $self->do_background_json( 'Store_insert', {
+                _table => 'ping',
+               username => $username,
+               timestamp => $self->datetime_now,
+               %$ping
+       });
 
        $self->do_background( 'Davolink_info', "$ip $username adsl" );
 
index c617b5e..d63f481 100644 (file)
@@ -35,31 +35,7 @@ sub pg_insert {
        $sth->execute( map { $h->{$_} } @c );
 }
 
-sub ADSL : Job : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
-
-       my $h = $workload->{ADSL} || die "no ADSL in ",dump $workload;
-       foreach my $c ( qw(ip username) ) {
-               $h->{$c} = $workload->{$c} || die "no $c in ",dump $workload;
-       }
-
-       warn "# ADSL ", dump $h;
-
-       $self->pg_insert(adsl => $h)
-}
-
-sub ping : Job : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
-
-       $self->pg_insert(ping => $workload);
-}
-
-sub voice : Job : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
-       $self->pg_insert(voice => $workload);
-}
-
-sub insert_table : Job : Decode(d_json) {
+sub insert : Job : Decode(d_json) {
        my ( $self, $job, $workload ) = @_;
        my $table = delete $workload->{_table} || die "no _table";
        $self->pg_insert($table => $workload);
index a59953c..b17ecdd 100644 (file)
@@ -35,7 +35,13 @@ sub info : Job : Encode(e_json) {
        $voice->connectSAU( $sau->[ $sru{serverid} ] );
         my %sau = $voice->searchSAU($sru{alias});
 
-       $self->do_background_json( 'Store_voice', { broj => $broj, state => $sau{state}, serverid => $sru{serverid} });
+       $self->do_background_json( 'Store_insert', {
+               _table => 'voice',
+               timestamp => $self->datetime_now,
+               broj => $broj,
+               state => $sau{state},
+               serverid => $sru{serverid},
+       });
 
        return { broj => $workload, sru => \%sru, sau => \%sau };
 }
index 2bfa48c..1370ba2 100755 (executable)
--- a/t/Store.t
+++ b/t/Store.t
@@ -11,25 +11,25 @@ use_ok 'APKPM::Store';
 
 ok my $o = APKPM::Store->new, 'new';
 
-ok my $r = $o->ADSL('job',{
+ok my $r = $o->insert('job',{
+_table => 'adsl',
 ip => '127.0.0.1',
 username => 'nobody',
-ADSL => {
-       ATTNRX => "36.5",
-       ATTNTX => "17.8",
-       MAXRX  => 13500,
-       MAXTX  => 880,
-       PWRRX  => "0.0",
-       PWRTX  => "12.6",
-       RX     => 8500,
-       SNRRX  => "11.4",
-       SNRTX  => "16.0",
-       TX     => 798,
-}
+ATTNRX => "36.5",
+ATTNTX => "17.8",
+MAXRX  => 13500,
+MAXTX  => 880,
+PWRRX  => "0.0",
+PWRTX  => "12.6",
+RX     => 8500,
+SNRRX  => "11.4",
+SNRTX  => "16.0",
+TX     => 798,
 }), 'ADSL';
 diag dump($r);
 
-ok my $r = $o->ping('job',{
+ok my $r = $o->insert('job',{
+_table => 'ping',
 ip => '127.0.0.1',
 username => 'nobody',
 rtt => 0.042,