X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=lib%2FAPKPM%2FStore.pm;h=def1d83fd1952057d19894288a9d5a9b93bd2ce6;hb=a57422e1cd4ee04a99af24ba38de0631ca2b8a7a;hp=c617b5e872063bba1e17c635c63207f22c16e289;hpb=ff4d7d3a138939ab9130c26c93b68ac461116842;p=APKPM.git diff --git a/lib/APKPM/Store.pm b/lib/APKPM/Store.pm index c617b5e..def1d83 100644 --- a/lib/APKPM/Store.pm +++ b/lib/APKPM/Store.pm @@ -5,6 +5,7 @@ use Moose; use Time::HiRes; use Data::Dump qw(dump); use DBD::Pg; +use Redis; with 'APKPM::Gearman'; @@ -26,56 +27,72 @@ sub dbh { sub pg_insert { my ( $self, $table, $h ) = @_; + my $redis = Redis->new; - my @c = keys %$h; + my @c; + + if ( my $cols = $redis->get("pg.$table") ) { + @c = split(/\s+/,$cols); + } else { + my $sth = $self->dbh->prepare( "select * from $table limit 1" ); + $sth->execute; + @c = @{ $sth->{NAME_lc} }; + $redis->set( "pg.$table" => join(' ',@c) ); + $redis->expire( "pg.$table" => 5 * 60 ); # refresh every 5 min + } my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')'; warn $sql; my $sth = $self->dbh->prepare($sql); - $sth->execute( map { $h->{$_} } @c ); -} -sub ADSL : Job : Decode(d_json) { - my ( $self, $job, $workload ) = @_; + my $h_lc; + $h_lc->{ lc $_ } = $h->{$_} foreach keys %$h; - my $h = $workload->{ADSL} || die "no ADSL in ",dump $workload; - foreach my $c ( qw(ip username) ) { - $h->{$c} = $workload->{$c} || die "no $c in ",dump $workload; + if ( my $username = $h->{username} ) { + my $key = join('.', 'table', $table, $username); + $redis->set( $key => $self->e_json($h) ); + $redis->expire( $key => 15 * 60 ); # 15 min timeout } - warn "# ADSL ", dump $h; - - $self->pg_insert(adsl => $h) -} - -sub ping : Job : Decode(d_json) { - my ( $self, $job, $workload ) = @_; - - $self->pg_insert(ping => $workload); -} - -sub voice : Job : Decode(d_json) { - my ( $self, $job, $workload ) = @_; - $self->pg_insert(voice => $workload); + $sth->execute( map { $h_lc->{$_} } @c ); } -sub insert_table : Job : Decode(d_json) { +sub insert : Job : Decode(d_json) : MinProcesses(0) { my ( $self, $job, $workload ) = @_; my $table = delete $workload->{_table} || die "no _table"; $self->pg_insert($table => $workload); } -sub sql : Job : Encode(e_json) { +sub sql : Job : Encode(e_json) : MinProcesses(1) { my ( $self, $job, $workload ) = @_; my $sth = $self->dbh->prepare($workload); - my $rows = $sth->execute; + my $rows = eval { $sth->execute }; + return { error => $@ } if $@; warn "# $rows rows get_username_table $workload\n"; + $rows = $sth->fetchall_arrayref; + my @columns = @{ $sth->{NAME} }; + + # decode hash column + my $hash_col; + foreach ( 0 .. $#columns ) { + $hash_col = $_ if $columns[$_] eq 'h'; + } + if ( defined $hash_col ) { + map { + my $hash = $_->[$hash_col]; + $hash =~ s/\@/\\\@/g; + $_->[$hash_col] = eval "{ $hash }"; + $_->[$hash_col] = "ERROR: $@ for $hash" if $@; + } @$rows + } + return { - columns => $sth->{NAME}, - rows => $sth->fetchall_arrayref, + columns => \@columns, + rows => $rows, + hash_col => $hash_col, }; }