rename Couchbase to Store
authorDobrica Pavlinusic <dpavlin@rot13.org>
Tue, 20 Sep 2011 12:09:08 +0000 (12:09 +0000)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Tue, 20 Sep 2011 12:09:08 +0000 (12:09 +0000)
this is first step in refactoring to remove Membase as back-end store

lib/CloudStore/Couchbase.pm [deleted file]
lib/CloudStore/Store.pm [new file with mode: 0644]
rsync-piper.pl
rsync-xfer-trigger.pl
store-fsck.pl

diff --git a/lib/CloudStore/Couchbase.pm b/lib/CloudStore/Couchbase.pm
deleted file mode 100644 (file)
index c59a1ac..0000000
+++ /dev/null
@@ -1,327 +0,0 @@
-package CloudStore::Couchbase;
-use warnings;
-use strict;
-
-use autodie;
-use JSON::XS;
-use File::Path qw(make_path);
-use File::Slurp qw();
-use Cache::Memcached;
-use Digest::MD5 qw(md5_base64);
-use Data::Dump qw(dump);
-use LWP::Simple;
-use Carp qw(confess);
-
-use WarnColor;
-
-my $buckets = {
-       users => 5800,
-       files => 5801,
-       session => 5802,
-};
-
-sub new {
-       my ($class) = @_;
-
-       my $self = {};
-       bless $self, $class;
-
-       foreach my $bucket ( keys %$buckets ) {
-               my $port = $buckets->{$bucket};
-               my $server = new Cache::Memcached {
-                       'servers' => [ "127.0.0.1:$port" ],
-                       'debug' => defined $ENV{DEBUG} && $ENV{DEBUG} > 3,
-               #       'compress_threshold' => 10_000,
-               };
-               #$server->set_servers($array_ref);
-               #$server->set_compress_threshold(10_000);
-               $server->enable_compress(0);
-               $self->{$bucket} = $server;
-
-       }
-
-       warn "# new ",dump $self if $ENV{DEBUG};
-
-       return $self;
-}
-
-sub json_set {
-       my ($self,$bucket,$key,$data) = @_;
-       confess "data not ref ",dump($data) unless ref $data;
-       my $json = encode_json $data;
-       $self->{$bucket}->set( $key => $json );
-       warn "## json_set $bucket $key $json\n";
-       return $json;
-}
-
-sub json_get {
-       my ($self,$bucket,$key,$data) = @_;
-       if ( my $json = $self->{$bucket}->get($key) ) {
-               warn "## json_get $bucket $key $json\n";
-               return decode_json $json;
-       }
-}
-
-sub user_set {
-       my ($self,$data) = @_;
-       $self->json_set( 'users', $data->{login}, $data );
-}
-
-sub user_get {
-       my ($self,$login) = @_;
-       $login = $login->{login} if ref $login;
-       my $user = $self->json_get( 'users', $login );
-       $user->{usage} = $self->usage( $login );
-       $user->{status} = $self->status( $login );
-       warn "## user ",dump($user) if $ENV{DEBUG};
-       return $user;
-}
-
-sub status {
-       my ($self,$login,$message) = @_;
-       $login = $login->{login} if ref $login;
-       if ( $message ) {
-               $self->{session}->set( "$login:status" => $message );
-               return $message;
-       } else {
-               $self->{session}->get( "$login:status" );
-       }
-}
-
-sub usage_decr {
-       my ($self,$data) = @_;
-       $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
-}
-
-sub usage_incr {
-       my ($self,$data) = @_;
-       $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
-}
-
-sub usage {
-       my ($self,$login) = @_;
-       $login = $login->{login} if ref $login;
-       $self->{session}->get( $login . ':usage' );
-}
-
-sub couchdb {
-       my $self = shift @_;
-       my $fmt  = shift @_;
-       my $url = sprintf $fmt, @_;
-
-       warn "# couchdb $url\n";
-       if ( my $json = get $url ) {
-               warn "## $url $json\n";
-               my $r = decode_json $json;
-               return $r;
-       }
-}
-
-sub usage_init {
-       my ($self,$login) = @_;
-       $login = $login->{login} if ref $login;
-
-       my $usage = 0;
-
-       if ( my $r = $self->couchdb(
-               'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after'
-               , $login
-               , $login
-       )) {
-
-               $usage = $r->{rows}->[0]->{value};
-               $usage = 0 unless defined $usage;
-       }
-
-       $self->{session}->set( $login . ':usage' => $usage );
-}
-
-sub _file_key {
-       my $data = shift;
-       #md5_base64( $data->{login} . '/' . $data->{file} );
-       $data->{login} . ':' . $data->{file};
-}
-
-sub file_set {
-       my ($self,$data) = @_;
-       $self->json_set( 'files', _file_key($data), $data );
-}
-
-sub file_get {
-       my ($self,$data) = @_;
-       $self->json_get( 'files', _file_key($data) );
-}
-
-sub modify_file {
-       my ( $self,$data ) = @_;
-
-       if ( my $old = $self->file_get( $data ) ) {
-               $self->usage_decr( $data );
-       }
-
-       $self->new_file($data);
-}
-
-sub new_file {
-       my ( $self,$data ) = @_;
-       $self->file_set($data);
-       $self->usage_incr($data);
-}
-
-sub remove_file {
-       my ( $self, $data ) = @_;
-       $self->usage_decr( $data );
-       my $k = _file_key $data;
-       $self->{files}->delete( $k );
-}
-
-sub make_dir {
-       my ( $self, $data ) = @_;
-
-}
-
-sub new_link {
-       my ( $self, $data ) = @_;
-
-       warn "# new_link ",dump $data;
-
-       if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
-               my ( $dir, $to, $name ) = ( $1, $2, $3 );
-               my $path = "users/$data->{login}/blob/" . $data->{file};
-               my $link_to = readlink $path;
-               warn "$link_to";
-               if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
-
-                       my $s = $path;
-                       $s =~ s{/[^/]+$}{}; # strip filename
-                       while ( $link_to =~ s{/../}{/} ) {
-                               $s =~ s{/[^/]+$}{} || die "can't strip $s";
-                               warn "## simplify $s $link_to\n";
-                       }
-                       $s .= $link_to;
-
-                       my $d = "users/$to/blob";
-                       if ( ! -e $d ) {
-                               warn "ERROR: no to user $to in $d";
-                               return;
-                       }
-                       $d .= "/$name";
-
-                       # $name can contain directories so we must create them
-                       my $to_dir = $d;
-                       $to_dir =~ s{/[^/]+$}{};
-                       make_path $to_dir if ! -e $to_dir;
-
-                       if ( ! -e $s ) {
-                               warn "ERROR: can't find source $s";
-                       } else {
-
-                               warn "link $s -> $d\n";
-                               link $s, $d;
-
-                               my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
-
-                               my $origin = $self->file_get({
-                                       login => $l,
-                                       file  => $f,
-                               });
-                               $self->new_file($origin);
-                               warn "INFO: sent file ",dump($origin);
-                       }
-
-
-               } else {
-                       warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
-               }
-       }
-}
-
-sub transfer {
-       my ( $self,$data ) = @_;
-
-       my $blob = "users/$data->{login}/blob";
-        my $path = "$blob/$data->{file}";
-
-       if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
-               my $type = $1;
-
-               if ( $type eq 'f' ) {
-                       $self->modify_file( $data );
-                       $self->dedup( $data, $path );
-               } elsif ( $type eq 'd' ) {
-                       $self->make_dir( $data );
-               } elsif ( $type eq 'L' ) {
-                       $self->new_link( $data );
-               } else {
-                       die "unknown type $type ", dump $data;
-               }
-       } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
-               $self->remove_file($data);
-       }
-       return $data;
-}
-
-sub md5pool {
-       my ( $path, $md5 ) = @_;
-
-       my $pool = 'md5'; # FIXME sharding?
-       mkdir $pool unless -e $pool;
-
-       if ( -e "$pool/$md5" ) {
-               warn "dedup hit $md5 $path\n";
-               my $dedup = $path . '.dedup';
-               rename $path, $dedup;
-               link "$pool/$md5", $path;
-               unlink $dedup;
-       } else {
-               link $path, "$pool/$md5";
-       }
-}
-
-my $empty_md5 = " " x 32;
-
-sub dedup {
-       my ( $self, $data, $path ) = @_;
-
-       if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
-               my $dir = $1;
-               my $imported = 0;
-               warn "IMPORT ", $data->{file}, "\n";
-               open(my $md5sum, '<', $path);
-               while(<$md5sum>) {
-                       chomp;
-                       my ( $md5, $file ) = split(/\s+/,$_,2);
-                       if ( ! -e "md5/$md5" ) {
-                               warn "MISSING $md5 $file\n";
-                               next;
-                       }
-                       my $new = "users/$data->{login}/blob/$dir$file";
-                       if ( ! -e $new ) {
-                               # create path from md5sum file
-                               my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
-                               make_path $only_dir unless -d $only_dir;
-                               $imported += link "md5/$md5", $new;
-                               my $fake = {
-                                       login => $data->{login},
-                                       host => $data->{host},
-                                       file => $dir . $file,
-                                       md5 => $md5,
-                                       size => -s $new,
-                               };
-                               $self->new_file($fake);
-                               warn "import from $path ",dump($fake);
-                       } else {
-                               md5pool $new => $md5;
-                       }
-               }
-               print "INFO imported $imported files from ",dump($data);
-       }
-
-       if ( $data->{md5} ne $empty_md5 ) {
-               md5pool $path => $data->{md5};
-       } else {
-               
-       }
-}
-
-1;
diff --git a/lib/CloudStore/Store.pm b/lib/CloudStore/Store.pm
new file mode 100644 (file)
index 0000000..401c95e
--- /dev/null
@@ -0,0 +1,327 @@
+package CloudStore::Store;
+use warnings;
+use strict;
+
+use autodie;
+use JSON::XS;
+use File::Path qw(make_path);
+use File::Slurp qw();
+use Cache::Memcached;
+use Digest::MD5 qw(md5_base64);
+use Data::Dump qw(dump);
+use LWP::Simple;
+use Carp qw(confess);
+
+use WarnColor;
+
+my $buckets = {
+       users => 5800,
+       files => 5801,
+       session => 5802,
+};
+
+sub new {
+       my ($class) = @_;
+
+       my $self = {};
+       bless $self, $class;
+
+       foreach my $bucket ( keys %$buckets ) {
+               my $port = $buckets->{$bucket};
+               my $server = new Cache::Memcached {
+                       'servers' => [ "127.0.0.1:$port" ],
+                       'debug' => defined $ENV{DEBUG} && $ENV{DEBUG} > 3,
+               #       'compress_threshold' => 10_000,
+               };
+               #$server->set_servers($array_ref);
+               #$server->set_compress_threshold(10_000);
+               $server->enable_compress(0);
+               $self->{$bucket} = $server;
+
+       }
+
+       warn "# new ",dump $self if $ENV{DEBUG};
+
+       return $self;
+}
+
+sub json_set {
+       my ($self,$bucket,$key,$data) = @_;
+       confess "data not ref ",dump($data) unless ref $data;
+       my $json = encode_json $data;
+       $self->{$bucket}->set( $key => $json );
+       warn "## json_set $bucket $key $json\n";
+       return $json;
+}
+
+sub json_get {
+       my ($self,$bucket,$key,$data) = @_;
+       if ( my $json = $self->{$bucket}->get($key) ) {
+               warn "## json_get $bucket $key $json\n";
+               return decode_json $json;
+       }
+}
+
+sub user_set {
+       my ($self,$data) = @_;
+       $self->json_set( 'users', $data->{login}, $data );
+}
+
+sub user_get {
+       my ($self,$login) = @_;
+       $login = $login->{login} if ref $login;
+       my $user = $self->json_get( 'users', $login );
+       $user->{usage} = $self->usage( $login );
+       $user->{status} = $self->status( $login );
+       warn "## user ",dump($user) if $ENV{DEBUG};
+       return $user;
+}
+
+sub status {
+       my ($self,$login,$message) = @_;
+       $login = $login->{login} if ref $login;
+       if ( $message ) {
+               $self->{session}->set( "$login:status" => $message );
+               return $message;
+       } else {
+               $self->{session}->get( "$login:status" );
+       }
+}
+
+sub usage_decr {
+       my ($self,$data) = @_;
+       $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
+}
+
+sub usage_incr {
+       my ($self,$data) = @_;
+       $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
+}
+
+sub usage {
+       my ($self,$login) = @_;
+       $login = $login->{login} if ref $login;
+       $self->{session}->get( $login . ':usage' );
+}
+
+sub couchdb {
+       my $self = shift @_;
+       my $fmt  = shift @_;
+       my $url = sprintf $fmt, @_;
+
+       warn "# couchdb $url\n";
+       if ( my $json = get $url ) {
+               warn "## $url $json\n";
+               my $r = decode_json $json;
+               return $r;
+       }
+}
+
+sub usage_init {
+       my ($self,$login) = @_;
+       $login = $login->{login} if ref $login;
+
+       my $usage = 0;
+
+       if ( my $r = $self->couchdb(
+               'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after'
+               , $login
+               , $login
+       )) {
+
+               $usage = $r->{rows}->[0]->{value};
+               $usage = 0 unless defined $usage;
+       }
+
+       $self->{session}->set( $login . ':usage' => $usage );
+}
+
+sub _file_key {
+       my $data = shift;
+       #md5_base64( $data->{login} . '/' . $data->{file} );
+       $data->{login} . ':' . $data->{file};
+}
+
+sub file_set {
+       my ($self,$data) = @_;
+       $self->json_set( 'files', _file_key($data), $data );
+}
+
+sub file_get {
+       my ($self,$data) = @_;
+       $self->json_get( 'files', _file_key($data) );
+}
+
+sub modify_file {
+       my ( $self,$data ) = @_;
+
+       if ( my $old = $self->file_get( $data ) ) {
+               $self->usage_decr( $data );
+       }
+
+       $self->new_file($data);
+}
+
+sub new_file {
+       my ( $self,$data ) = @_;
+       $self->file_set($data);
+       $self->usage_incr($data);
+}
+
+sub remove_file {
+       my ( $self, $data ) = @_;
+       $self->usage_decr( $data );
+       my $k = _file_key $data;
+       $self->{files}->delete( $k );
+}
+
+sub make_dir {
+       my ( $self, $data ) = @_;
+
+}
+
+sub new_link {
+       my ( $self, $data ) = @_;
+
+       warn "# new_link ",dump $data;
+
+       if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
+               my ( $dir, $to, $name ) = ( $1, $2, $3 );
+               my $path = "users/$data->{login}/blob/" . $data->{file};
+               my $link_to = readlink $path;
+               warn "$link_to";
+               if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
+
+                       my $s = $path;
+                       $s =~ s{/[^/]+$}{}; # strip filename
+                       while ( $link_to =~ s{/../}{/} ) {
+                               $s =~ s{/[^/]+$}{} || die "can't strip $s";
+                               warn "## simplify $s $link_to\n";
+                       }
+                       $s .= $link_to;
+
+                       my $d = "users/$to/blob";
+                       if ( ! -e $d ) {
+                               warn "ERROR: no to user $to in $d";
+                               return;
+                       }
+                       $d .= "/$name";
+
+                       # $name can contain directories so we must create them
+                       my $to_dir = $d;
+                       $to_dir =~ s{/[^/]+$}{};
+                       make_path $to_dir if ! -e $to_dir;
+
+                       if ( ! -e $s ) {
+                               warn "ERROR: can't find source $s";
+                       } else {
+
+                               warn "link $s -> $d\n";
+                               link $s, $d;
+
+                               my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
+
+                               my $origin = $self->file_get({
+                                       login => $l,
+                                       file  => $f,
+                               });
+                               $self->new_file($origin);
+                               warn "INFO: sent file ",dump($origin);
+                       }
+
+
+               } else {
+                       warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
+               }
+       }
+}
+
+sub transfer {
+       my ( $self,$data ) = @_;
+
+       my $blob = "users/$data->{login}/blob";
+        my $path = "$blob/$data->{file}";
+
+       if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
+               my $type = $1;
+
+               if ( $type eq 'f' ) {
+                       $self->modify_file( $data );
+                       $self->dedup( $data, $path );
+               } elsif ( $type eq 'd' ) {
+                       $self->make_dir( $data );
+               } elsif ( $type eq 'L' ) {
+                       $self->new_link( $data );
+               } else {
+                       die "unknown type $type ", dump $data;
+               }
+       } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
+               $self->remove_file($data);
+       }
+       return $data;
+}
+
+sub md5pool {
+       my ( $path, $md5 ) = @_;
+
+       my $pool = 'md5'; # FIXME sharding?
+       mkdir $pool unless -e $pool;
+
+       if ( -e "$pool/$md5" ) {
+               warn "dedup hit $md5 $path\n";
+               my $dedup = $path . '.dedup';
+               rename $path, $dedup;
+               link "$pool/$md5", $path;
+               unlink $dedup;
+       } else {
+               link $path, "$pool/$md5";
+       }
+}
+
+my $empty_md5 = " " x 32;
+
+sub dedup {
+       my ( $self, $data, $path ) = @_;
+
+       if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
+               my $dir = $1;
+               my $imported = 0;
+               warn "IMPORT ", $data->{file}, "\n";
+               open(my $md5sum, '<', $path);
+               while(<$md5sum>) {
+                       chomp;
+                       my ( $md5, $file ) = split(/\s+/,$_,2);
+                       if ( ! -e "md5/$md5" ) {
+                               warn "MISSING $md5 $file\n";
+                               next;
+                       }
+                       my $new = "users/$data->{login}/blob/$dir$file";
+                       if ( ! -e $new ) {
+                               # create path from md5sum file
+                               my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
+                               make_path $only_dir unless -d $only_dir;
+                               $imported += link "md5/$md5", $new;
+                               my $fake = {
+                                       login => $data->{login},
+                                       host => $data->{host},
+                                       file => $dir . $file,
+                                       md5 => $md5,
+                                       size => -s $new,
+                               };
+                               $self->new_file($fake);
+                               warn "import from $path ",dump($fake);
+                       } else {
+                               md5pool $new => $md5;
+                       }
+               }
+               print "INFO imported $imported files from ",dump($data);
+       }
+
+       if ( $data->{md5} ne $empty_md5 ) {
+               md5pool $path => $data->{md5};
+       } else {
+               
+       }
+}
+
+1;
index 1aaca5b..ed6d11b 100755 (executable)
@@ -13,7 +13,7 @@ use Module::Refresh;
 
 use lib 'lib';
 use WarnColor;
-use CloudStore::Couchbase;
+use CloudStore::Store;
 
 my $dir   = $ENV{RSYNC_DIR}  || '/srv/cloudstore';
 my $port  = $ENV{RSYNC_PORT} || 6501;
@@ -56,7 +56,7 @@ if ( $ENV{SQL} ) {
        exit 1;
 }
 
-my $store = CloudStore::Couchbase->new;
+my $store = CloudStore::Store->new;
 
 mkdir "$dir/var" if ! -e "$dir/var";
 
index 97018e4..9f9f370 100755 (executable)
@@ -5,9 +5,9 @@ use strict;
 use Data::Dump qw(dump);
 
 use lib '/srv/cloudstore/lib';
-use CloudStore::Couchbase;
+use CloudStore::Store;
 
-my $store = CloudStore::Couchbase->new;
+my $store = CloudStore::Store->new;
 
 my $login = $ENV{RSYNC_MODULE_NAME} || die "no RSYNC_MODULE_NAME";
 
index eb376d7..ebbc981 100755 (executable)
@@ -5,11 +5,11 @@ use strict;
 use Data::Dump qw(dump);
 
 use lib 'lib';
-use CloudStore::Couchbase;
+use CloudStore::Store;
 
 my $login = $ARGV[0] || die "usage: $0 login\n";
 
-my $store = CloudStore::Couchbase->new;
+my $store = CloudStore::Store->new;
 
 my $offset = 0;
 my $limit  = $ENV{LIMIT} || 10000;