web interface requires columns also under base table name
[APKPM.git] / lib / APKPM / Store.pm
1 package APKPM::Store;
2
3 use base qw(Gearman::Driver::Worker);
4 use Moose;
5 use Time::HiRes;
6 use Data::Dump qw(dump);
7 use DBD::Pg;
8 use Redis;
9
10 with 'APKPM::Gearman';
11
12 my $redis_ttl = 30 * 60; # 30 min
13
14 sub prefix { 'Store_' }
15
16 sub process_name {
17         my ( $self, $orig, $job_name ) = @_;
18         warn "# process_name $orig $job_name\n";
19         return "$orig ($job_name)";
20 }
21
22 sub dbh {
23         DBI->connect_cached('DBI:Pg:dbname=apkpm','','', {
24                 RaiseError => 1,
25                 AutoCommit => 1,
26         });
27 }
28
29 our $redis;
30 sub redis { $redis ||= Redis->new };
31
32 sub _create_index {
33         my ( $self, $table, $timestamp ) = @_;
34
35         my $sth = $self->dbh->prepare( qq{
36
37 select
38     t.relname as table_name,
39     i.relname as index_name,
40     a.attname as column_name
41 from
42     pg_class t,
43     pg_class i,
44     pg_index ix,
45     pg_attribute a
46 where
47     t.oid = ix.indrelid
48     and i.oid = ix.indexrelid
49     and a.attrelid = t.oid
50     and a.attnum = ANY(ix.indkey)
51     and t.relkind = 'r'
52     and t.relname like ?
53 order by
54     t.relname,
55     i.relname;
56
57         } );
58         $sth->execute($table);
59         my $sql = '';
60         while( my $row = $sth->fetchrow_hashref ) {
61                 #warn "# _create_index $table ",dump($row);
62                 $sql .= qq|create index $row->{index_name}_$timestamp on ${table}_$timestamp($row->{column_name});\n|;
63         }
64
65         return $sql;
66 }
67
68 sub pg_insert {
69         my ( $self, $base_table, $h ) = @_;
70
71         my $table = $base_table;
72
73         my @c;
74
75         my $timestamp =
76                 exists $h->{timestamp} ? $h->{timestamp} :
77                 exists $h->{start}     ? $h->{start}     :
78                 warn "no timestamp partitioning on $table";
79
80         my $create_table;
81         if ( $timestamp ) {
82                 $timestamp =~ s/^(\d{4})-(\d{2})-(\d{2}).*$/$1_$2_$3/;
83                 my $part = $table . '_' . $timestamp;
84                 $create_table = qq{
85                         create table $part () inherits ($table) ;
86                 };
87                 $create_table .= $self->_create_index( $table, $timestamp );
88                 $table = $part;
89                 warn "# using partition $table";
90         }
91
92         if ( my $cols = $self->redis->get("pg.$table") ) {
93                 @c = split(/\s+/,$cols);
94         } else {
95                 my $sth = $self->dbh->prepare( "select * from $table limit 1" );
96                 eval { $sth->execute; };
97                 if ( $@ ) {
98                         warn "ERROR $@\n# try $create_table\n";
99                         $self->dbh->do( $create_table );
100                         $sth->execute;
101                 }
102
103                 @c = @{ $sth->{NAME_lc} };
104                 $self->redis->set( "pg.$table" => join(' ',@c) );
105                 $self->redis->expire( "pg.$table" => $redis_ttl );
106
107                 # web interface requires base table columns
108                 $self->redis->set( "pg.$base_table" => join(' ',@c) );
109                 $self->redis->expire( "pg.$base_table" => $redis_ttl );
110         }
111
112         my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')';
113         warn $sql;
114         my $sth = $self->dbh->prepare($sql);
115
116         my $h_lc;
117         $h_lc->{ lc $_ } = $h->{$_} foreach keys %$h;
118
119         if ( my $username = $h->{username} ) {
120                 my $key = join('.', 'table', $base_table, $username);
121                 $self->redis->set( $key => $self->e_json($h) );
122                 $self->redis->expire( $key => $redis_ttl );
123         }
124
125         $h_lc->{h} = $self->to_hstore( $h_lc->{h} ) if exists $h_lc->{h};
126
127         $sth->execute( map { $h_lc->{$_} } @c );
128 }
129
130 sub insert : Job : Decode(d_json) : MinProcesses(0) {
131         my ( $self, $job, $workload ) = @_;
132         my $table = delete $workload->{_table} || die "no _table";
133         $self->pg_insert($table => $workload);
134 }
135
136 sub sql : Job : Encode(e_json) : MinProcesses(1) {
137         my ( $self, $job, $workload ) = @_;
138
139         my $sth = $self->dbh->prepare($workload);
140         my $rows = eval { $sth->execute };
141         return { error => $@ } if $@;
142
143         warn "# $rows rows get_username_table $workload\n";
144
145         $rows = $sth->fetchall_arrayref;
146         my @columns = @{ $sth->{NAME} };
147
148         # decode hash column
149         my $hash_col;
150         foreach ( 0 .. $#columns ) {
151                 $hash_col = $_ if $columns[$_] eq 'h';
152         }
153         if ( defined $hash_col ) {
154                 map {
155                         my $hash = $_->[$hash_col];
156                         $hash =~ s/\@/\\\@/g;
157                         $_->[$hash_col] = eval "{ $hash }";
158                         $_->[$hash_col] = "ERROR: $@ for $hash" if $@;
159                 } @$rows
160         }
161
162         return {
163                 columns => \@columns,
164                 rows => $rows,
165                 hash_col => $hash_col,
166         };
167 }
168
169 1;