'APKPM::Davolink::info' => {
max_processes => 60,
},
+ 'APKPM::Store::insert' => {
+ max_processes => 10,
+ },
},
);
$driver->run;
$ret->{UPTIME} = $cpeconnect->custom("cat /proc/uptime");
} elsif ( $param =~ m/adsl/i ) {
$ret->{ADSL} = { $cpeconnect->adsl_info };
- $self->do_background_json( 'Store_ADSL', $ret );
+ $self->do_background_json( 'Store_insert', {
+ _table => 'adsl',
+ ip => $cpeip,
+ username => $username,
+ timestamp => $self->datetime_now,
+ %{$ret->{ADSL}}
+ });
my $redis = Redis->new;
$redis->sadd( 'poll.adsl.ok' => $cpeip );
} elsif ( $param =~ m/wan/i ) {
use Data::Dump qw(dump);
use JSON::XS;
+use DateTime;
+use DateTime::Format::Pg;
+
sub e_json {
my ( $self, $result ) = @_;
warn "# e_json ", dump $result;
}
+sub datetime_now {
+ return DateTime::Format::Pg->format_datetime( DateTime->now );
+}
+
1;
use Time::HiRes qw(time);
use Data::Dump qw(dump);
use Redis;
-use DateTime;
-use DateTime::Format::Pg;
with 'APKPM::Gearman::Client';
with 'APKPM::Gearman';
sub by_prefix : Job : Encode(e_json) {
my ( $self, $job, $workload ) = @_;
- my $start = DateTime->now();
+ my $redis = Redis->new;
+ $redis->del( $_ ) foreach $redis->keys('poll.*');
+
+ my $start = $self->datetime_now;
+
+ $redis->set( 'poll.start' => $start );
my $entries = $self->do( 'LDAP_search' => "(&(cn=$workload*)(dhcpStatements=*))" );
$ip_username->{ $entry->{cn} } = $1 if $entry->{dhcpStatements} =~ m/fixed-address\s+(\S+)/;
}
- my $redis = Redis->new;
- $redis->del( $_ ) foreach $redis->keys('poll.*');
-
my $taskset = $self->gc->new_task_set;
my $results;
while (my ($username,$ip) = each %$ip_username) {
warn "# wait";
$taskset->wait;
- my $finish = DateTime->now();
+ my $finish = $self->datetime_now;
+ $redis->set( 'poll.finish' => $finish );
my $poll;
foreach my $k ( $redis->keys('poll.*') ) {
my $n = $k;
$n =~ s/^poll\.//;
$n =~ s/\./_/g;
- $poll->{$n} = $redis->scard($k);
+ $poll->{$n} = eval { $redis->scard($k) } || $redis->get($k);
}
- $poll->{start} = DateTime::Format::Pg->format_datetime($start);
- $poll->{finish} = DateTime::Format::Pg->format_datetime($finish);
-
- $poll->{_table} = 'poll';
warn dump $poll;
- $self->do_background_json('Store_insert_table', $poll);
+ $self->do_background_json('Store_insert', { _table => 'poll', %$poll });
- return { ldap => $ip_username, results => $results, duration => $finish - $start, poll => $poll };
+ return { ldap => $ip_username, results => $results, poll => $poll };
}
sub ip_username : Job : Decode(d_array) : Encode(e_json) {
}
$ping->{username} = $username;
- $self->do_background_json( 'Store_ping', $ping );
+ $ping->{timestamp} = $self->datetime_now;
+ $self->do_background_json( 'Store_insert', {
+ _table => 'ping',
+ username => $username,
+ timestamp => $self->datetime_now,
+ %$ping
+ });
$self->do_background( 'Davolink_info', "$ip $username adsl" );
$sth->execute( map { $h->{$_} } @c );
}
-sub ADSL : Job : Decode(d_json) {
- my ( $self, $job, $workload ) = @_;
-
- my $h = $workload->{ADSL} || die "no ADSL in ",dump $workload;
- foreach my $c ( qw(ip username) ) {
- $h->{$c} = $workload->{$c} || die "no $c in ",dump $workload;
- }
-
- warn "# ADSL ", dump $h;
-
- $self->pg_insert(adsl => $h)
-}
-
-sub ping : Job : Decode(d_json) {
- my ( $self, $job, $workload ) = @_;
-
- $self->pg_insert(ping => $workload);
-}
-
-sub voice : Job : Decode(d_json) {
- my ( $self, $job, $workload ) = @_;
- $self->pg_insert(voice => $workload);
-}
-
-sub insert_table : Job : Decode(d_json) {
+sub insert : Job : Decode(d_json) {
my ( $self, $job, $workload ) = @_;
my $table = delete $workload->{_table} || die "no _table";
$self->pg_insert($table => $workload);
$voice->connectSAU( $sau->[ $sru{serverid} ] );
my %sau = $voice->searchSAU($sru{alias});
- $self->do_background_json( 'Store_voice', { broj => $broj, state => $sau{state}, serverid => $sru{serverid} });
+ $self->do_background_json( 'Store_insert', {
+ _table => 'voice',
+ timestamp => $self->datetime_now,
+ broj => $broj,
+ state => $sau{state},
+ serverid => $sru{serverid},
+ });
return { broj => $workload, sru => \%sru, sau => \%sau };
}
ok my $o = APKPM::Store->new, 'new';
-ok my $r = $o->ADSL('job',{
+ok my $r = $o->insert('job',{
+_table => 'adsl',
ip => '127.0.0.1',
username => 'nobody',
-ADSL => {
- ATTNRX => "36.5",
- ATTNTX => "17.8",
- MAXRX => 13500,
- MAXTX => 880,
- PWRRX => "0.0",
- PWRTX => "12.6",
- RX => 8500,
- SNRRX => "11.4",
- SNRTX => "16.0",
- TX => 798,
-}
+ATTNRX => "36.5",
+ATTNTX => "17.8",
+MAXRX => 13500,
+MAXTX => 880,
+PWRRX => "0.0",
+PWRTX => "12.6",
+RX => 8500,
+SNRRX => "11.4",
+SNRTX => "16.0",
+TX => 798,
}), 'ADSL';
diag dump($r);
-ok my $r = $o->ping('job',{
+ok my $r = $o->insert('job',{
+_table => 'ping',
ip => '127.0.0.1',
username => 'nobody',
rtt => 0.042,