Merge branch 'devel' of h1dev:/srv/APKPM/
[APKPM.git] / lib / APKPM / Store.pm
index c617b5e..962e7c5 100644 (file)
@@ -5,9 +5,12 @@ use Moose;
 use Time::HiRes;
 use Data::Dump qw(dump);
 use DBD::Pg;
+use Redis;
 
 with 'APKPM::Gearman';
 
+my $redis_ttl = 30 * 60; # 30 min
+
 sub prefix { 'Store_' }
 
 sub process_name {
@@ -23,59 +26,143 @@ sub dbh {
        });
 }
 
-sub pg_insert {
-       my ( $self, $table, $h ) = @_;
+our $redis;
+sub redis { $redis ||= Redis->new };
+
+sub _create_index {
+       my ( $self, $table, $timestamp ) = @_;
+
+       my $sth = $self->dbh->prepare( qq{
+
+select
+    t.relname as table_name,
+    i.relname as index_name,
+    a.attname as column_name
+from
+    pg_class t,
+    pg_class i,
+    pg_index ix,
+    pg_attribute a
+where
+    t.oid = ix.indrelid
+    and i.oid = ix.indexrelid
+    and a.attrelid = t.oid
+    and a.attnum = ANY(ix.indkey)
+    and t.relkind = 'r'
+    and t.relname like ?
+order by
+    t.relname,
+    i.relname;
+
+       } );
+       $sth->execute($table);
+       my $sql = '';
+       while( my $row = $sth->fetchrow_hashref ) {
+               #warn "# _create_index $table ",dump($row);
+               $sql .= qq|create index $row->{index_name}_$timestamp on ${table}_$timestamp($row->{column_name});\n|;
+       }
 
+       return $sql;
+}
 
-       my @c = keys %$h;
+sub pg_insert {
+       my ( $self, $base_table, $h ) = @_;
+
+       my $table = $base_table;
+
+       my @c;
+
+       my $timestamp =
+               exists $h->{timestamp} ? $h->{timestamp} :
+               exists $h->{start}     ? $h->{start}     :
+               warn "no timestamp partitioning on $table";
+
+       my $create_table;
+       if ( $timestamp ) {
+               $timestamp =~ s/^(\d{4})-(\d{2})-(\d{2}).*$/$1_$2_$3/;
+               my $part = $table . '_' . $timestamp;
+               $create_table = qq{
+                       create table $part () inherits ($table) ;
+               };
+               $create_table .= $self->_create_index( $table, $timestamp );
+               $table = $part;
+               warn "# using partition $table";
+       }
+
+       if ( my $cols = $self->redis->get("columns.$table") ) {
+               @c = split(/\s+/,$cols);
+       } else {
+               my $sth = $self->dbh->prepare( "select * from $table limit 1" );
+               eval { $sth->execute; };
+               if ( $@ ) {
+                       warn "ERROR $@\n# try $create_table\n";
+                       $self->dbh->do( $create_table );
+                       $sth->execute;
+               }
+
+               @c = @{ $sth->{NAME_lc} };
+               $self->redis->set( "columns.$table" => join(' ',@c) );
+               $self->redis->expire( "columns.$table" => $redis_ttl );
+
+               # web interface requires base table columns
+               $self->redis->set( "columns.$base_table" => join(' ',@c) );
+               $self->redis->expire( "columns.$base_table" => $redis_ttl );
+       }
 
        my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')';
        warn $sql;
        my $sth = $self->dbh->prepare($sql);
-       $sth->execute( map { $h->{$_} } @c );
-}
 
-sub ADSL : Job : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
+       my $h_lc;
+       $h_lc->{ lc $_ } = $h->{$_} foreach keys %$h;
 
-       my $h = $workload->{ADSL} || die "no ADSL in ",dump $workload;
-       foreach my $c ( qw(ip username) ) {
-               $h->{$c} = $workload->{$c} || die "no $c in ",dump $workload;
+       if ( my $username = $h->{username} ) {
+               my $key = join('.', 'table', $base_table, $username);
+               $self->redis->set( $key => $self->e_json($h) );
+               $self->redis->expire( $key => $redis_ttl );
        }
 
-       warn "# ADSL ", dump $h;
+       $h_lc->{h} = $self->to_hstore( $h_lc->{h} ) if exists $h_lc->{h};
 
-       $self->pg_insert(adsl => $h)
+       $sth->execute( map { $h_lc->{$_} } @c );
 }
 
-sub ping : Job : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
-
-       $self->pg_insert(ping => $workload);
-}
-
-sub voice : Job : Decode(d_json) {
-       my ( $self, $job, $workload ) = @_;
-       $self->pg_insert(voice => $workload);
-}
-
-sub insert_table : Job : Decode(d_json) {
+sub insert : Job : Decode(d_json) : MinProcesses(0) {
        my ( $self, $job, $workload ) = @_;
        my $table = delete $workload->{_table} || die "no _table";
        $self->pg_insert($table => $workload);
 }
 
-sub sql : Job : Encode(e_json) {
+sub sql : Job : Encode(e_json) : MinProcesses(1) {
        my ( $self, $job, $workload ) = @_;
 
        my $sth = $self->dbh->prepare($workload);
-       my $rows = $sth->execute;
+       my $rows = eval { $sth->execute };
+       return { error => $@ } if $@;
 
        warn "# $rows rows get_username_table $workload\n";
 
+       $rows = $sth->fetchall_arrayref;
+       my @columns = @{ $sth->{NAME} };
+
+       # decode hash column
+       my $hash_col;
+       foreach ( 0 .. $#columns ) {
+               $hash_col = $_ if $columns[$_] eq 'h';
+       }
+       if ( defined $hash_col ) {
+               map {
+                       my $hash = $_->[$hash_col];
+                       $hash =~ s/\@/\\\@/g;
+                       $_->[$hash_col] = eval "{ $hash }";
+                       $_->[$hash_col] = "ERROR: $@ for $hash" if $@;
+               } @$rows
+       }
+
        return {
-               columns => $sth->{NAME},
-               rows => $sth->fetchall_arrayref,
+               columns => \@columns,
+               rows => $rows,
+               hash_col => $hash_col,
        };
 }