correctly decode hstore with @ in data
[APKPM.git] / lib / APKPM / Store.pm
index ae0d42a..19aaa6c 100644 (file)
@@ -5,6 +5,7 @@ use Moose;
 use Time::HiRes;
 use Data::Dump qw(dump);
 use DBD::Pg;
+use Redis;
 
 with 'APKPM::Gearman';
 
@@ -16,39 +17,77 @@ sub process_name {
        return "$orig ($job_name)";
 }
 
+sub dbh {
+       DBI->connect_cached('DBI:Pg:dbname=apkpm','','', {
+               RaiseError => 1,
+               AutoCommit => 1,
+       });
+}
 
 sub pg_insert {
        my ( $self, $table, $h ) = @_;
 
-       my $dbh = DBI->connect_cached('DBI:Pg:dbname=apkpm','','', {
-               RaiseError => 1,
-               AutoCommit => 1,
-       });
+       my $redis = Redis->new;
 
-       my @c = keys %$h;
+       my @c;
+
+       if ( my $cols = $redis->get("pg.$table") ) {
+               @c = split(/\s+/,$cols);
+       } else {
+               my $sth = $self->dbh->prepare( "select * from $table limit 1" );
+               $sth->execute;
+               @c = @{ $sth->{NAME_lc} };
+               $redis->set( "pg.$table" => join(' ',@c) );
+               $redis->expire( "pg.$table" => 5 * 60 ); # refresh every 5 min
+       }
 
        my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')';
        warn $sql;
-       my $sth = $dbh->prepare($sql);
-       $sth->execute( map { $h->{$_} } @c );
+       my $sth = $self->dbh->prepare($sql);
+
+       my $h_lc;
+       $h_lc->{ lc $_ } = $h->{$_} foreach keys %$h;
+
+       $sth->execute( map { $h_lc->{$_} } @c );
 }
 
-sub ADSL : Job : MinProcesses(1) : MaxProcesses(5) : Decode(d_json) {
+sub insert : Job : Decode(d_json) : MinProcesses(0) {
        my ( $self, $job, $workload ) = @_;
+       my $table = delete $workload->{_table} || die "no _table";
+       $self->pg_insert($table => $workload);
+}
 
-       my $h = $workload->{ADSL} || die "no ADSL in ",dump $workload;
-       foreach my $c ( qw(ip username) ) {
-               $h->{$c} = $workload->{$c} || die "no $c in ",dump $workload;
-       }
+sub sql : Job : Encode(e_json) : MinProcesses(0) {
+       my ( $self, $job, $workload ) = @_;
 
-       warn "# ADSL ", dump $h;
+       my $sth = $self->dbh->prepare($workload);
+       my $rows = eval { $sth->execute };
+       return { error => $@ } if $@;
 
-       $self->pg_insert(adsl => $h)
-}
+       warn "# $rows rows get_username_table $workload\n";
 
-sub ping : Job : MinProcesses(1) : MaxProcesses(5) : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
+       $rows = $sth->fetchall_arrayref;
+       my @columns = @{ $sth->{NAME} };
 
-       $self->pg_insert(ping => $workload);
+       # decode hash column
+       my $hash_col;
+       foreach ( 0 .. $#columns ) {
+               $hash_col = $_ if $columns[$_] eq 'h';
+       }
+       if ( defined $hash_col ) {
+               map {
+                       my $hash = $_->[$hash_col];
+                       $hash =~ s/\@/\\\@/g;
+                       $_->[$hash_col] = eval "{ $hash }";
+                       $_->[$hash_col] = "ERROR: $@ for $hash" if $@;
+               } @$rows
+       }
+
+       return {
+               columns => \@columns,
+               rows => $rows,
+               hash_col => $hash_col,
+       };
 }
+
 1;