bce29ac68d721d06561bd6a90e96c5e20821fb45
[APKPM.git] / lib / APKPM / Store.pm
1 package APKPM::Store;
2
3 use base qw(Gearman::Driver::Worker);
4 use Moose;
5 use Time::HiRes;
6 use Data::Dump qw(dump);
7 use DBD::Pg;
8 use Redis;
9
10 with 'APKPM::Gearman';
11
12 sub prefix { 'Store_' }
13
14 sub process_name {
15         my ( $self, $orig, $job_name ) = @_;
16         warn "# process_name $orig $job_name\n";
17         return "$orig ($job_name)";
18 }
19
20 sub dbh {
21         DBI->connect_cached('DBI:Pg:dbname=apkpm','','', {
22                 RaiseError => 1,
23                 AutoCommit => 1,
24         });
25 }
26
27 our $redis;
28 sub redis { $redis ||= Redis->new };
29
30 sub _create_index {
31         my ( $self, $table, $timestamp ) = @_;
32
33         my $sth = $self->dbh->prepare( qq{
34
35 select
36     t.relname as table_name,
37     i.relname as index_name,
38     a.attname as column_name
39 from
40     pg_class t,
41     pg_class i,
42     pg_index ix,
43     pg_attribute a
44 where
45     t.oid = ix.indrelid
46     and i.oid = ix.indexrelid
47     and a.attrelid = t.oid
48     and a.attnum = ANY(ix.indkey)
49     and t.relkind = 'r'
50     and t.relname like ?
51 order by
52     t.relname,
53     i.relname;
54
55         } );
56         $sth->execute($table);
57         my $sql = '';
58         while( my $row = $sth->fetchrow_hashref ) {
59                 #warn "# _create_index $table ",dump($row);
60                 $sql .= qq|create index $row->{index_name}_$timestamp on ${table}_$timestamp($row->{column_name});\n|;
61         }
62
63         return $sql;
64 }
65
66 sub pg_insert {
67         my ( $self, $base_table, $h ) = @_;
68
69         my $table = $base_table;
70
71         my @c;
72
73         my $timestamp =
74                 exists $h->{timestamp} ? $h->{timestamp} :
75                 exists $h->{start}     ? $h->{start}     :
76                 warn "no timestamp partitioning on $table";
77
78         my $create_table;
79         if ( $timestamp ) {
80                 $timestamp =~ s/^(\d{4})-(\d{2})-(\d{2}).*$/$1_$2_$3/;
81                 my $part = $table . '_' . $timestamp;
82                 $create_table = qq{
83                         create table $part () inherits ($table) ;
84                 };
85                 $create_table .= $self->_create_index( $table, $timestamp );
86                 $table = $part;
87                 warn "# using partition $table";
88         }
89
90         if ( my $cols = $self->redis->get("pg.$table") ) {
91                 @c = split(/\s+/,$cols);
92         } else {
93                 my $sth = $self->dbh->prepare( "select * from $table limit 1" );
94                 eval { $sth->execute; };
95                 if ( $@ ) {
96                         warn "ERROR $@\n# try $create_table\n";
97                         $self->dbh->do( $create_table );
98                         $sth->execute;
99                 }
100
101                 @c = @{ $sth->{NAME_lc} };
102                 $self->redis->set( "pg.$table" => join(' ',@c) );
103                 $self->redis->expire( "pg.$table" => 15 * 60 ); # refresh every 15 min
104         }
105
106         my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')';
107         warn $sql;
108         my $sth = $self->dbh->prepare($sql);
109
110         my $h_lc;
111         $h_lc->{ lc $_ } = $h->{$_} foreach keys %$h;
112
113         if ( my $username = $h->{username} ) {
114                 my $key = join('.', 'table', $base_table, $username);
115                 $self->redis->set( $key => $self->e_json($h) );
116                 $self->redis->expire( $key => 15 * 60 ); # 15 min timeout
117         }
118
119         $h_lc->{h} = $self->to_hstore( $h_lc->{h} ) if exists $h_lc->{h};
120
121         $sth->execute( map { $h_lc->{$_} } @c );
122 }
123
124 sub insert : Job : Decode(d_json) : MinProcesses(0) {
125         my ( $self, $job, $workload ) = @_;
126         my $table = delete $workload->{_table} || die "no _table";
127         $self->pg_insert($table => $workload);
128 }
129
130 sub sql : Job : Encode(e_json) : MinProcesses(1) {
131         my ( $self, $job, $workload ) = @_;
132
133         my $sth = $self->dbh->prepare($workload);
134         my $rows = eval { $sth->execute };
135         return { error => $@ } if $@;
136
137         warn "# $rows rows get_username_table $workload\n";
138
139         $rows = $sth->fetchall_arrayref;
140         my @columns = @{ $sth->{NAME} };
141
142         # decode hash column
143         my $hash_col;
144         foreach ( 0 .. $#columns ) {
145                 $hash_col = $_ if $columns[$_] eq 'h';
146         }
147         if ( defined $hash_col ) {
148                 map {
149                         my $hash = $_->[$hash_col];
150                         $hash =~ s/\@/\\\@/g;
151                         $_->[$hash_col] = eval "{ $hash }";
152                         $_->[$hash_col] = "ERROR: $@ for $hash" if $@;
153                 } @$rows
154         }
155
156         return {
157                 columns => \@columns,
158                 rows => $rows,
159                 hash_col => $hash_col,
160         };
161 }
162
163 1;