with 'APKPM::Gearman';
+my $redis_ttl = 30 * 60; # 30 min
+
sub prefix { 'Store_' }
sub process_name {
});
}
+our $redis;
+sub redis { $redis ||= Redis->new };
+
+sub _create_index {
+ my ( $self, $table, $timestamp ) = @_;
+
+ my $sth = $self->dbh->prepare( qq{
+
+select
+ t.relname as table_name,
+ i.relname as index_name,
+ a.attname as column_name
+from
+ pg_class t,
+ pg_class i,
+ pg_index ix,
+ pg_attribute a
+where
+ t.oid = ix.indrelid
+ and i.oid = ix.indexrelid
+ and a.attrelid = t.oid
+ and a.attnum = ANY(ix.indkey)
+ and t.relkind = 'r'
+ and t.relname like ?
+order by
+ t.relname,
+ i.relname;
+
+ } );
+ $sth->execute($table);
+ my $sql = '';
+ while( my $row = $sth->fetchrow_hashref ) {
+ #warn "# _create_index $table ",dump($row);
+ $sql .= qq|create index $row->{index_name}_$timestamp on ${table}_$timestamp($row->{column_name});\n|;
+ }
+
+ return $sql;
+}
+
sub pg_insert {
- my ( $self, $table, $h ) = @_;
+ my ( $self, $base_table, $h ) = @_;
- my $redis = Redis->new;
+ my $table = $base_table;
my @c;
- if ( my $cols = $redis->get("pg.$table") ) {
+ my $timestamp =
+ exists $h->{timestamp} ? $h->{timestamp} :
+ exists $h->{start} ? $h->{start} :
+ warn "no timestamp partitioning on $table";
+
+ my $create_table;
+ if ( $timestamp ) {
+ $timestamp =~ s/^(\d{4})-(\d{2})-(\d{2}).*$/$1_$2_$3/;
+ my $part = $table . '_' . $timestamp;
+ $create_table = qq{
+ create table $part () inherits ($table) ;
+ };
+ $create_table .= $self->_create_index( $table, $timestamp );
+ $table = $part;
+ warn "# using partition $table";
+ }
+
+ if ( my $cols = $self->redis->get("pg.$table") ) {
@c = split(/\s+/,$cols);
} else {
my $sth = $self->dbh->prepare( "select * from $table limit 1" );
- $sth->execute;
+ eval { $sth->execute; };
+ if ( $@ ) {
+ warn "ERROR $@\n# try $create_table\n";
+ $self->dbh->do( $create_table );
+ $sth->execute;
+ }
+
@c = @{ $sth->{NAME_lc} };
- $redis->set( "pg.$table" => join(' ',@c) );
- $redis->expire( "pg.$table" => 5 * 60 ); # refresh every 5 min
+ $self->redis->set( "pg.$table" => join(' ',@c) );
+ $self->redis->expire( "pg.$table" => $redis_ttl );
+
+ # web interface requires base table columns
+ $self->redis->set( "pg.$base_table" => join(' ',@c) );
+ $self->redis->expire( "pg.$base_table" => $redis_ttl );
}
my $sql = "INSERT INTO $table (" . join(',',@c) . ') values (' . join(',', map { '?' } 0 .. $#c) . ')';
$h_lc->{ lc $_ } = $h->{$_} foreach keys %$h;
if ( my $username = $h->{username} ) {
- my $key = join('.', 'table', $table, $username);
- $redis->set( $key => $self->e_json($h) );
- $redis->expire( $key => 15 * 60 ); # 15 min timeout
+ my $key = join('.', 'table', $base_table, $username);
+ $self->redis->set( $key => $self->e_json($h) );
+ $self->redis->expire( $key => $redis_ttl );
}
+ $h_lc->{h} = $self->to_hstore( $h_lc->{h} ) if exists $h_lc->{h};
+
$sth->execute( map { $h_lc->{$_} } @c );
}