3 use base qw(Gearman::Driver::Worker);
6 use Data::Dump qw(dump);
10 with 'APKPM::Gearman';
12 my $redis_ttl = 30 * 60; # 30 min
14 sub prefix { 'Store_' }
17 my ( $self, $orig, $job_name ) = @_;
18 warn "# process_name $orig $job_name\n";
19 return "$orig ($job_name)";
23 DBI->connect_cached('DBI:Pg:dbname=apkpm','','', {
30 sub redis { $redis ||= Redis->new };
33 my ( $self, $table, $timestamp ) = @_;
35 my $sth = $self->dbh->prepare( qq{
38 t.relname as table_name,
39 i.relname as index_name,
40 a.attname as column_name
48 and i.oid = ix.indexrelid
49 and a.attrelid = t.oid
50 and a.attnum = ANY(ix.indkey)
58 $sth->execute($table);
60 while( my $row = $sth->fetchrow_hashref ) {
61 #warn "# _create_index $table ",dump($row);
62 $sql .= qq|create index $row->{index_name}_$timestamp on ${table}_$timestamp($row->{column_name});\n|;
69 my ( $self, $base_table, $h ) = @_;
71 my $table = $base_table;
76 exists $h->{timestamp} ? $h->{timestamp} :
77 exists $h->{start} ? $h->{start} :
78 warn "no timestamp partitioning on $table";
82 $timestamp =~ s/^(\d{4})-(\d{2})-(\d{2}).*$/$1_$2_$3/;
83 my $part = $table . '_' . $timestamp;
85 create table $part () inherits ($table) ;
87 $create_table .= $self->_create_index( $table, $timestamp );
89 warn "# using partition $table";
92 if ( my $cols = $self->redis->get("pg.$table") ) {
93 @c = split(/\s+/,$cols);
95 my $sth = $self->dbh->prepare( "select * from $table limit 1" );
96 eval { $sth->execute; };
98 warn "ERROR $@\n# try $create_table\n";
99 $self->dbh->do( $create_table );
103 @c = @{ $sth->{NAME_lc} };
104 $self->redis->set( "pg.$table" => join(' ',@c) );
105 $self->redis->expire( "pg.$table" => $redis_ttl );
107 # web interface requires base table columns
108 $self->redis->set( "pg.$base_table" => join(' ',@c) );
109 $self->redis->expire( "pg.$base_table" => $redis_ttl );
112 my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')';
114 my $sth = $self->dbh->prepare($sql);
117 $h_lc->{ lc $_ } = $h->{$_} foreach keys %$h;
119 if ( my $username = $h->{username} ) {
120 my $key = join('.', 'table', $base_table, $username);
121 $self->redis->set( $key => $self->e_json($h) );
122 $self->redis->expire( $key => $redis_ttl );
125 $h_lc->{h} = $self->to_hstore( $h_lc->{h} ) if exists $h_lc->{h};
127 $sth->execute( map { $h_lc->{$_} } @c );
130 sub insert : Job : Decode(d_json) : MinProcesses(0) {
131 my ( $self, $job, $workload ) = @_;
132 my $table = delete $workload->{_table} || die "no _table";
133 $self->pg_insert($table => $workload);
136 sub sql : Job : Encode(e_json) : MinProcesses(1) {
137 my ( $self, $job, $workload ) = @_;
139 my $sth = $self->dbh->prepare($workload);
140 my $rows = eval { $sth->execute };
141 return { error => $@ } if $@;
143 warn "# $rows rows get_username_table $workload\n";
145 $rows = $sth->fetchall_arrayref;
146 my @columns = @{ $sth->{NAME} };
150 foreach ( 0 .. $#columns ) {
151 $hash_col = $_ if $columns[$_] eq 'h';
153 if ( defined $hash_col ) {
155 my $hash = $_->[$hash_col];
156 $hash =~ s/\@/\\\@/g;
157 $_->[$hash_col] = eval "{ $hash }";
158 $_->[$hash_col] = "ERROR: $@ for $hash" if $@;
163 columns => \@columns,
165 hash_col => $hash_col,