use Time::HiRes;
use Data::Dump qw(dump);
use DBD::Pg;
+use Redis;
with 'APKPM::Gearman';
return "$orig ($job_name)";
}
+sub dbh {
+ DBI->connect_cached('DBI:Pg:dbname=apkpm','','', {
+ RaiseError => 1,
+ AutoCommit => 1,
+ });
+}
sub pg_insert {
my ( $self, $table, $h ) = @_;
- my $dbh = DBI->connect_cached('DBI:Pg:dbname=apkpm','','', {
- RaiseError => 1,
- AutoCommit => 1,
- });
+ my $redis = Redis->new;
- my @c = keys %$h;
+ my @c;
+
+ if ( my $cols = $redis->get("pg.$table") ) {
+ @c = split(/\s+/,$cols);
+ } else {
+ my $sth = $self->dbh->prepare( "select * from $table limit 1" );
+ $sth->execute;
+ @c = @{ $sth->{NAME_lc} };
+ $redis->set( "pg.$table" => join(' ',@c) );
+ $redis->expire( "pg.$table" => 5 * 60 ); # refresh every 5 min
+ }
my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')';
warn $sql;
- my $sth = $dbh->prepare($sql);
- $sth->execute( map { $h->{$_} } @c );
+ my $sth = $self->dbh->prepare($sql);
+
+ my $h_lc;
+ $h_lc->{ lc $_ } = $h->{$_} foreach keys %$h;
+
+ $sth->execute( map { $h_lc->{$_} } @c );
}
-sub ADSL : Job : MinProcesses(1) : MaxProcesses(5) : Decode(d_json) {
+sub insert : Job : Decode(d_json) : MinProcesses(0) {
my ( $self, $job, $workload ) = @_;
+ my $table = delete $workload->{_table} || die "no _table";
+ $self->pg_insert($table => $workload);
+}
- my $h = $workload->{ADSL} || die "no ADSL in ",dump $workload;
- foreach my $c ( qw(ip username) ) {
- $h->{$c} = $workload->{$c} || die "no $c in ",dump $workload;
- }
+sub sql : Job : Encode(e_json) : MinProcesses(0) {
+ my ( $self, $job, $workload ) = @_;
- warn "# ADSL ", dump $h;
+ my $sth = $self->dbh->prepare($workload);
+ my $rows = eval { $sth->execute };
+ return { error => $@ } if $@;
- $self->pg_insert(adsl => $h)
-}
+ warn "# $rows rows get_username_table $workload\n";
-sub ping : Job : MinProcesses(1) : MaxProcesses(5) : Decode(d_json) {
- my ( $self, $job, $workload ) = @_;
+ $rows = $sth->fetchall_arrayref;
+ my @columns = @{ $sth->{NAME} };
- $self->pg_insert(ping => $workload);
+ # decode hash column
+ my $hash_col;
+ foreach ( 0 .. $#columns ) {
+ $hash_col = $_ if $columns[$_] eq 'h';
+ }
+ if ( defined $hash_col ) {
+ map {
+ my $hash = $_->[$hash_col];
+ $hash =~ s/\@/\\\@/g;
+ $_->[$hash_col] = eval "{ $hash }";
+ $_->[$hash_col] = "ERROR: $@ for $hash" if $@;
+ } @$rows
+ }
+
+ return {
+ columns => \@columns,
+ rows => $rows,
+ hash_col => $hash_col,
+ };
}
+
1;