3 use base qw(Gearman::Driver::Worker);
6 use Data::Dump qw(dump);
10 with 'APKPM::Gearman';
12 sub prefix { 'Store_' }
15 my ( $self, $orig, $job_name ) = @_;
16 warn "# process_name $orig $job_name\n";
17 return "$orig ($job_name)";
21 DBI->connect_cached('DBI:Pg:dbname=apkpm','','', {
28 my ( $self, $table, $h ) = @_;
30 my $redis = Redis->new;
34 if ( my $cols = $redis->get("pg.$table") ) {
35 @c = split(/\s+/,$cols);
37 my $sth = $self->dbh->prepare( "select * from $table limit 1" );
39 @c = @{ $sth->{NAME_lc} };
40 $redis->set( "pg.$table" => join(' ',@c) );
41 $redis->expire( "pg.$table" => 5 * 60 ); # refresh every 5 min
44 my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')';
46 my $sth = $self->dbh->prepare($sql);
49 $h_lc->{ lc $_ } = $h->{$_} foreach keys %$h;
51 $sth->execute( map { $h_lc->{$_} } @c );
54 sub insert : Job : Decode(d_json) : MinProcesses(0) {
55 my ( $self, $job, $workload ) = @_;
56 my $table = delete $workload->{_table} || die "no _table";
57 $self->pg_insert($table => $workload);
60 sub sql : Job : Encode(e_json) : MinProcesses(0) {
61 my ( $self, $job, $workload ) = @_;
63 my $sth = $self->dbh->prepare($workload);
64 my $rows = eval { $sth->execute };
65 return { error => $@ } if $@;
67 warn "# $rows rows get_username_table $workload\n";
70 columns => $sth->{NAME},
71 rows => $sth->fetchall_arrayref,