From: Dobrica Pavlinusic Date: Tue, 20 Sep 2011 12:09:08 +0000 (+0000) Subject: rename Couchbase to Store X-Git-Url: http://git.rot13.org/?a=commitdiff_plain;h=839e8157a504aaaa19803de383ba4a64e3eb8754;hp=aef852f6c414751888eab77d5b81a7e667daa079;p=cloudstore.git rename Couchbase to Store this is first step in refactoring to remove Membase as back-end store --- diff --git a/lib/CloudStore/Couchbase.pm b/lib/CloudStore/Couchbase.pm deleted file mode 100644 index c59a1ac..0000000 --- a/lib/CloudStore/Couchbase.pm +++ /dev/null @@ -1,327 +0,0 @@ -package CloudStore::Couchbase; -use warnings; -use strict; - -use autodie; -use JSON::XS; -use File::Path qw(make_path); -use File::Slurp qw(); -use Cache::Memcached; -use Digest::MD5 qw(md5_base64); -use Data::Dump qw(dump); -use LWP::Simple; -use Carp qw(confess); - -use WarnColor; - -my $buckets = { - users => 5800, - files => 5801, - session => 5802, -}; - -sub new { - my ($class) = @_; - - my $self = {}; - bless $self, $class; - - foreach my $bucket ( keys %$buckets ) { - my $port = $buckets->{$bucket}; - my $server = new Cache::Memcached { - 'servers' => [ "127.0.0.1:$port" ], - 'debug' => defined $ENV{DEBUG} && $ENV{DEBUG} > 3, - # 'compress_threshold' => 10_000, - }; - #$server->set_servers($array_ref); - #$server->set_compress_threshold(10_000); - $server->enable_compress(0); - $self->{$bucket} = $server; - - } - - warn "# new ",dump $self if $ENV{DEBUG}; - - return $self; -} - -sub json_set { - my ($self,$bucket,$key,$data) = @_; - confess "data not ref ",dump($data) unless ref $data; - my $json = encode_json $data; - $self->{$bucket}->set( $key => $json ); - warn "## json_set $bucket $key $json\n"; - return $json; -} - -sub json_get { - my ($self,$bucket,$key,$data) = @_; - if ( my $json = $self->{$bucket}->get($key) ) { - warn "## json_get $bucket $key $json\n"; - return decode_json $json; - } -} - -sub user_set { - my ($self,$data) = @_; - $self->json_set( 'users', $data->{login}, $data ); -} - -sub user_get { - my ($self,$login) = @_; - $login = $login->{login} if ref $login; - my $user = $self->json_get( 'users', $login ); - $user->{usage} = $self->usage( $login ); - $user->{status} = $self->status( $login ); - warn "## user ",dump($user) if $ENV{DEBUG}; - return $user; -} - -sub status { - my ($self,$login,$message) = @_; - $login = $login->{login} if ref $login; - if ( $message ) { - $self->{session}->set( "$login:status" => $message ); - return $message; - } else { - $self->{session}->get( "$login:status" ); - } -} - -sub usage_decr { - my ($self,$data) = @_; - $self->{session}->decr( $data->{login} . ':usage' => $data->{size} ); -} - -sub usage_incr { - my ($self,$data) = @_; - $self->{session}->incr( $data->{login} . ':usage' => $data->{size} ); -} - -sub usage { - my ($self,$login) = @_; - $login = $login->{login} if ref $login; - $self->{session}->get( $login . ':usage' ); -} - -sub couchdb { - my $self = shift @_; - my $fmt = shift @_; - my $url = sprintf $fmt, @_; - - warn "# couchdb $url\n"; - if ( my $json = get $url ) { - warn "## $url $json\n"; - my $r = decode_json $json; - return $r; - } -} - -sub usage_init { - my ($self,$login) = @_; - $login = $login->{login} if ref $login; - - my $usage = 0; - - if ( my $r = $self->couchdb( - 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after' - , $login - , $login - )) { - - $usage = $r->{rows}->[0]->{value}; - $usage = 0 unless defined $usage; - } - - $self->{session}->set( $login . ':usage' => $usage ); -} - -sub _file_key { - my $data = shift; - #md5_base64( $data->{login} . '/' . $data->{file} ); - $data->{login} . ':' . $data->{file}; -} - -sub file_set { - my ($self,$data) = @_; - $self->json_set( 'files', _file_key($data), $data ); -} - -sub file_get { - my ($self,$data) = @_; - $self->json_get( 'files', _file_key($data) ); -} - -sub modify_file { - my ( $self,$data ) = @_; - - if ( my $old = $self->file_get( $data ) ) { - $self->usage_decr( $data ); - } - - $self->new_file($data); -} - -sub new_file { - my ( $self,$data ) = @_; - $self->file_set($data); - $self->usage_incr($data); -} - -sub remove_file { - my ( $self, $data ) = @_; - $self->usage_decr( $data ); - my $k = _file_key $data; - $self->{files}->delete( $k ); -} - -sub make_dir { - my ( $self, $data ) = @_; - -} - -sub new_link { - my ( $self, $data ) = @_; - - warn "# new_link ",dump $data; - - if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) { - my ( $dir, $to, $name ) = ( $1, $2, $3 ); - my $path = "users/$data->{login}/blob/" . $data->{file}; - my $link_to = readlink $path; - warn "$link_to"; - if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) { - - my $s = $path; - $s =~ s{/[^/]+$}{}; # strip filename - while ( $link_to =~ s{/../}{/} ) { - $s =~ s{/[^/]+$}{} || die "can't strip $s"; - warn "## simplify $s $link_to\n"; - } - $s .= $link_to; - - my $d = "users/$to/blob"; - if ( ! -e $d ) { - warn "ERROR: no to user $to in $d"; - return; - } - $d .= "/$name"; - - # $name can contain directories so we must create them - my $to_dir = $d; - $to_dir =~ s{/[^/]+$}{}; - make_path $to_dir if ! -e $to_dir; - - if ( ! -e $s ) { - warn "ERROR: can't find source $s"; - } else { - - warn "link $s -> $d\n"; - link $s, $d; - - my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)}; - - my $origin = $self->file_get({ - login => $l, - file => $f, - }); - $self->new_file($origin); - warn "INFO: sent file ",dump($origin); - } - - - } else { - warn "ERROR: can't SEND To:$to Name:$name Link:$link_to"; - } - } -} - -sub transfer { - my ( $self,$data ) = @_; - - my $blob = "users/$data->{login}/blob"; - my $path = "$blob/$data->{file}"; - - if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create - my $type = $1; - - if ( $type eq 'f' ) { - $self->modify_file( $data ); - $self->dedup( $data, $path ); - } elsif ( $type eq 'd' ) { - $self->make_dir( $data ); - } elsif ( $type eq 'L' ) { - $self->new_link( $data ); - } else { - die "unknown type $type ", dump $data; - } - } elsif ( $data->{itemize} =~ m/\*deleting/ ) { - $self->remove_file($data); - } - return $data; -} - -sub md5pool { - my ( $path, $md5 ) = @_; - - my $pool = 'md5'; # FIXME sharding? - mkdir $pool unless -e $pool; - - if ( -e "$pool/$md5" ) { - warn "dedup hit $md5 $path\n"; - my $dedup = $path . '.dedup'; - rename $path, $dedup; - link "$pool/$md5", $path; - unlink $dedup; - } else { - link $path, "$pool/$md5"; - } -} - -my $empty_md5 = " " x 32; - -sub dedup { - my ( $self, $data, $path ) = @_; - - if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) { - my $dir = $1; - my $imported = 0; - warn "IMPORT ", $data->{file}, "\n"; - open(my $md5sum, '<', $path); - while(<$md5sum>) { - chomp; - my ( $md5, $file ) = split(/\s+/,$_,2); - if ( ! -e "md5/$md5" ) { - warn "MISSING $md5 $file\n"; - next; - } - my $new = "users/$data->{login}/blob/$dir$file"; - if ( ! -e $new ) { - # create path from md5sum file - my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$}; - make_path $only_dir unless -d $only_dir; - $imported += link "md5/$md5", $new; - my $fake = { - login => $data->{login}, - host => $data->{host}, - file => $dir . $file, - md5 => $md5, - size => -s $new, - }; - $self->new_file($fake); - warn "import from $path ",dump($fake); - } else { - md5pool $new => $md5; - } - } - print "INFO imported $imported files from ",dump($data); - } - - if ( $data->{md5} ne $empty_md5 ) { - md5pool $path => $data->{md5}; - } else { - - } -} - -1; diff --git a/lib/CloudStore/Store.pm b/lib/CloudStore/Store.pm new file mode 100644 index 0000000..401c95e --- /dev/null +++ b/lib/CloudStore/Store.pm @@ -0,0 +1,327 @@ +package CloudStore::Store; +use warnings; +use strict; + +use autodie; +use JSON::XS; +use File::Path qw(make_path); +use File::Slurp qw(); +use Cache::Memcached; +use Digest::MD5 qw(md5_base64); +use Data::Dump qw(dump); +use LWP::Simple; +use Carp qw(confess); + +use WarnColor; + +my $buckets = { + users => 5800, + files => 5801, + session => 5802, +}; + +sub new { + my ($class) = @_; + + my $self = {}; + bless $self, $class; + + foreach my $bucket ( keys %$buckets ) { + my $port = $buckets->{$bucket}; + my $server = new Cache::Memcached { + 'servers' => [ "127.0.0.1:$port" ], + 'debug' => defined $ENV{DEBUG} && $ENV{DEBUG} > 3, + # 'compress_threshold' => 10_000, + }; + #$server->set_servers($array_ref); + #$server->set_compress_threshold(10_000); + $server->enable_compress(0); + $self->{$bucket} = $server; + + } + + warn "# new ",dump $self if $ENV{DEBUG}; + + return $self; +} + +sub json_set { + my ($self,$bucket,$key,$data) = @_; + confess "data not ref ",dump($data) unless ref $data; + my $json = encode_json $data; + $self->{$bucket}->set( $key => $json ); + warn "## json_set $bucket $key $json\n"; + return $json; +} + +sub json_get { + my ($self,$bucket,$key,$data) = @_; + if ( my $json = $self->{$bucket}->get($key) ) { + warn "## json_get $bucket $key $json\n"; + return decode_json $json; + } +} + +sub user_set { + my ($self,$data) = @_; + $self->json_set( 'users', $data->{login}, $data ); +} + +sub user_get { + my ($self,$login) = @_; + $login = $login->{login} if ref $login; + my $user = $self->json_get( 'users', $login ); + $user->{usage} = $self->usage( $login ); + $user->{status} = $self->status( $login ); + warn "## user ",dump($user) if $ENV{DEBUG}; + return $user; +} + +sub status { + my ($self,$login,$message) = @_; + $login = $login->{login} if ref $login; + if ( $message ) { + $self->{session}->set( "$login:status" => $message ); + return $message; + } else { + $self->{session}->get( "$login:status" ); + } +} + +sub usage_decr { + my ($self,$data) = @_; + $self->{session}->decr( $data->{login} . ':usage' => $data->{size} ); +} + +sub usage_incr { + my ($self,$data) = @_; + $self->{session}->incr( $data->{login} . ':usage' => $data->{size} ); +} + +sub usage { + my ($self,$login) = @_; + $login = $login->{login} if ref $login; + $self->{session}->get( $login . ':usage' ); +} + +sub couchdb { + my $self = shift @_; + my $fmt = shift @_; + my $url = sprintf $fmt, @_; + + warn "# couchdb $url\n"; + if ( my $json = get $url ) { + warn "## $url $json\n"; + my $r = decode_json $json; + return $r; + } +} + +sub usage_init { + my ($self,$login) = @_; + $login = $login->{login} if ref $login; + + my $usage = 0; + + if ( my $r = $self->couchdb( + 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after' + , $login + , $login + )) { + + $usage = $r->{rows}->[0]->{value}; + $usage = 0 unless defined $usage; + } + + $self->{session}->set( $login . ':usage' => $usage ); +} + +sub _file_key { + my $data = shift; + #md5_base64( $data->{login} . '/' . $data->{file} ); + $data->{login} . ':' . $data->{file}; +} + +sub file_set { + my ($self,$data) = @_; + $self->json_set( 'files', _file_key($data), $data ); +} + +sub file_get { + my ($self,$data) = @_; + $self->json_get( 'files', _file_key($data) ); +} + +sub modify_file { + my ( $self,$data ) = @_; + + if ( my $old = $self->file_get( $data ) ) { + $self->usage_decr( $data ); + } + + $self->new_file($data); +} + +sub new_file { + my ( $self,$data ) = @_; + $self->file_set($data); + $self->usage_incr($data); +} + +sub remove_file { + my ( $self, $data ) = @_; + $self->usage_decr( $data ); + my $k = _file_key $data; + $self->{files}->delete( $k ); +} + +sub make_dir { + my ( $self, $data ) = @_; + +} + +sub new_link { + my ( $self, $data ) = @_; + + warn "# new_link ",dump $data; + + if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) { + my ( $dir, $to, $name ) = ( $1, $2, $3 ); + my $path = "users/$data->{login}/blob/" . $data->{file}; + my $link_to = readlink $path; + warn "$link_to"; + if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) { + + my $s = $path; + $s =~ s{/[^/]+$}{}; # strip filename + while ( $link_to =~ s{/../}{/} ) { + $s =~ s{/[^/]+$}{} || die "can't strip $s"; + warn "## simplify $s $link_to\n"; + } + $s .= $link_to; + + my $d = "users/$to/blob"; + if ( ! -e $d ) { + warn "ERROR: no to user $to in $d"; + return; + } + $d .= "/$name"; + + # $name can contain directories so we must create them + my $to_dir = $d; + $to_dir =~ s{/[^/]+$}{}; + make_path $to_dir if ! -e $to_dir; + + if ( ! -e $s ) { + warn "ERROR: can't find source $s"; + } else { + + warn "link $s -> $d\n"; + link $s, $d; + + my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)}; + + my $origin = $self->file_get({ + login => $l, + file => $f, + }); + $self->new_file($origin); + warn "INFO: sent file ",dump($origin); + } + + + } else { + warn "ERROR: can't SEND To:$to Name:$name Link:$link_to"; + } + } +} + +sub transfer { + my ( $self,$data ) = @_; + + my $blob = "users/$data->{login}/blob"; + my $path = "$blob/$data->{file}"; + + if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create + my $type = $1; + + if ( $type eq 'f' ) { + $self->modify_file( $data ); + $self->dedup( $data, $path ); + } elsif ( $type eq 'd' ) { + $self->make_dir( $data ); + } elsif ( $type eq 'L' ) { + $self->new_link( $data ); + } else { + die "unknown type $type ", dump $data; + } + } elsif ( $data->{itemize} =~ m/\*deleting/ ) { + $self->remove_file($data); + } + return $data; +} + +sub md5pool { + my ( $path, $md5 ) = @_; + + my $pool = 'md5'; # FIXME sharding? + mkdir $pool unless -e $pool; + + if ( -e "$pool/$md5" ) { + warn "dedup hit $md5 $path\n"; + my $dedup = $path . '.dedup'; + rename $path, $dedup; + link "$pool/$md5", $path; + unlink $dedup; + } else { + link $path, "$pool/$md5"; + } +} + +my $empty_md5 = " " x 32; + +sub dedup { + my ( $self, $data, $path ) = @_; + + if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) { + my $dir = $1; + my $imported = 0; + warn "IMPORT ", $data->{file}, "\n"; + open(my $md5sum, '<', $path); + while(<$md5sum>) { + chomp; + my ( $md5, $file ) = split(/\s+/,$_,2); + if ( ! -e "md5/$md5" ) { + warn "MISSING $md5 $file\n"; + next; + } + my $new = "users/$data->{login}/blob/$dir$file"; + if ( ! -e $new ) { + # create path from md5sum file + my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$}; + make_path $only_dir unless -d $only_dir; + $imported += link "md5/$md5", $new; + my $fake = { + login => $data->{login}, + host => $data->{host}, + file => $dir . $file, + md5 => $md5, + size => -s $new, + }; + $self->new_file($fake); + warn "import from $path ",dump($fake); + } else { + md5pool $new => $md5; + } + } + print "INFO imported $imported files from ",dump($data); + } + + if ( $data->{md5} ne $empty_md5 ) { + md5pool $path => $data->{md5}; + } else { + + } +} + +1; diff --git a/rsync-piper.pl b/rsync-piper.pl index 1aaca5b..ed6d11b 100755 --- a/rsync-piper.pl +++ b/rsync-piper.pl @@ -13,7 +13,7 @@ use Module::Refresh; use lib 'lib'; use WarnColor; -use CloudStore::Couchbase; +use CloudStore::Store; my $dir = $ENV{RSYNC_DIR} || '/srv/cloudstore'; my $port = $ENV{RSYNC_PORT} || 6501; @@ -56,7 +56,7 @@ if ( $ENV{SQL} ) { exit 1; } -my $store = CloudStore::Couchbase->new; +my $store = CloudStore::Store->new; mkdir "$dir/var" if ! -e "$dir/var"; diff --git a/rsync-xfer-trigger.pl b/rsync-xfer-trigger.pl index 97018e4..9f9f370 100755 --- a/rsync-xfer-trigger.pl +++ b/rsync-xfer-trigger.pl @@ -5,9 +5,9 @@ use strict; use Data::Dump qw(dump); use lib '/srv/cloudstore/lib'; -use CloudStore::Couchbase; +use CloudStore::Store; -my $store = CloudStore::Couchbase->new; +my $store = CloudStore::Store->new; my $login = $ENV{RSYNC_MODULE_NAME} || die "no RSYNC_MODULE_NAME"; diff --git a/store-fsck.pl b/store-fsck.pl index eb376d7..ebbc981 100755 --- a/store-fsck.pl +++ b/store-fsck.pl @@ -5,11 +5,11 @@ use strict; use Data::Dump qw(dump); use lib 'lib'; -use CloudStore::Couchbase; +use CloudStore::Store; my $login = $ARGV[0] || die "usage: $0 login\n"; -my $store = CloudStore::Couchbase->new; +my $store = CloudStore::Store->new; my $offset = 0; my $limit = $ENV{LIMIT} || 10000;