added ping store and refactored common code
[APKPM.git] / lib / APKPM / Store.pm
1 package APKPM::Store;
2
3 use base qw(Gearman::Driver::Worker);
4 use Moose;
5 use Time::HiRes;
6 use Data::Dump qw(dump);
7 use DBD::Pg;
8
9 with 'APKPM::Gearman';
10
11 sub prefix { 'Store_' }
12
13 sub process_name {
14         my ( $self, $orig, $job_name ) = @_;
15         warn "# process_name $orig $job_name\n";
16         return "$orig ($job_name)";
17 }
18
19
20 sub pg_insert {
21         my ( $self, $table, $h ) = @_;
22
23         my $dbh = DBI->connect_cached('DBI:Pg:dbname=apkpm','','', {
24                 RaiseError => 1,
25                 AutoCommit => 1,
26         });
27
28         my @c = keys %$h;
29
30         my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')';
31         warn $sql;
32         my $sth = $dbh->prepare($sql);
33         $sth->execute( map { $h->{$_} } @c );
34 }
35
36 sub ADSL : Job : MinProcesses(1) : MaxProcesses(5) : Decode(d_json) {
37         my ( $self, $job, $workload ) = @_;
38
39         my $h = $workload->{ADSL} || die "no ADSL in ",dump $workload;
40         foreach my $c ( qw(ip username) ) {
41                 $h->{$c} = $workload->{$c} || die "no $c in ",dump $workload;
42         }
43
44         warn "# ADSL ", dump $h;
45
46         $self->pg_insert(adsl => $h)
47 }
48
49 sub ping : Job : MinProcesses(1) : MaxProcesses(5) : Decode(d_json) {
50         my ( $self, $job, $workload ) = @_;
51
52         $self->pg_insert(ping => $workload);
53 }
54 1;