+++ /dev/null
-package CloudStore::Couchbase;
-use warnings;
-use strict;
-
-use autodie;
-use JSON::XS;
-use File::Path qw(make_path);
-use File::Slurp qw();
-use Cache::Memcached;
-use Digest::MD5 qw(md5_base64);
-use Data::Dump qw(dump);
-use LWP::Simple;
-use Carp qw(confess);
-
-use WarnColor;
-
-my $buckets = {
- users => 5800,
- files => 5801,
- session => 5802,
-};
-
-sub new {
- my ($class) = @_;
-
- my $self = {};
- bless $self, $class;
-
- foreach my $bucket ( keys %$buckets ) {
- my $port = $buckets->{$bucket};
- my $server = new Cache::Memcached {
- 'servers' => [ "127.0.0.1:$port" ],
- 'debug' => defined $ENV{DEBUG} && $ENV{DEBUG} > 3,
- # 'compress_threshold' => 10_000,
- };
- #$server->set_servers($array_ref);
- #$server->set_compress_threshold(10_000);
- $server->enable_compress(0);
- $self->{$bucket} = $server;
-
- }
-
- warn "# new ",dump $self if $ENV{DEBUG};
-
- return $self;
-}
-
-sub json_set {
- my ($self,$bucket,$key,$data) = @_;
- confess "data not ref ",dump($data) unless ref $data;
- my $json = encode_json $data;
- $self->{$bucket}->set( $key => $json );
- warn "## json_set $bucket $key $json\n";
- return $json;
-}
-
-sub json_get {
- my ($self,$bucket,$key,$data) = @_;
- if ( my $json = $self->{$bucket}->get($key) ) {
- warn "## json_get $bucket $key $json\n";
- return decode_json $json;
- }
-}
-
-sub user_set {
- my ($self,$data) = @_;
- $self->json_set( 'users', $data->{login}, $data );
-}
-
-sub user_get {
- my ($self,$login) = @_;
- $login = $login->{login} if ref $login;
- my $user = $self->json_get( 'users', $login );
- $user->{usage} = $self->usage( $login );
- $user->{status} = $self->status( $login );
- warn "## user ",dump($user) if $ENV{DEBUG};
- return $user;
-}
-
-sub status {
- my ($self,$login,$message) = @_;
- $login = $login->{login} if ref $login;
- if ( $message ) {
- $self->{session}->set( "$login:status" => $message );
- return $message;
- } else {
- $self->{session}->get( "$login:status" );
- }
-}
-
-sub usage_decr {
- my ($self,$data) = @_;
- $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
-}
-
-sub usage_incr {
- my ($self,$data) = @_;
- $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
-}
-
-sub usage {
- my ($self,$login) = @_;
- $login = $login->{login} if ref $login;
- $self->{session}->get( $login . ':usage' );
-}
-
-sub couchdb {
- my $self = shift @_;
- my $fmt = shift @_;
- my $url = sprintf $fmt, @_;
-
- warn "# couchdb $url\n";
- if ( my $json = get $url ) {
- warn "## $url $json\n";
- my $r = decode_json $json;
- return $r;
- }
-}
-
-sub usage_init {
- my ($self,$login) = @_;
- $login = $login->{login} if ref $login;
-
- my $usage = 0;
-
- if ( my $r = $self->couchdb(
- 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after'
- , $login
- , $login
- )) {
-
- $usage = $r->{rows}->[0]->{value};
- $usage = 0 unless defined $usage;
- }
-
- $self->{session}->set( $login . ':usage' => $usage );
-}
-
-sub _file_key {
- my $data = shift;
- #md5_base64( $data->{login} . '/' . $data->{file} );
- $data->{login} . ':' . $data->{file};
-}
-
-sub file_set {
- my ($self,$data) = @_;
- $self->json_set( 'files', _file_key($data), $data );
-}
-
-sub file_get {
- my ($self,$data) = @_;
- $self->json_get( 'files', _file_key($data) );
-}
-
-sub modify_file {
- my ( $self,$data ) = @_;
-
- if ( my $old = $self->file_get( $data ) ) {
- $self->usage_decr( $data );
- }
-
- $self->new_file($data);
-}
-
-sub new_file {
- my ( $self,$data ) = @_;
- $self->file_set($data);
- $self->usage_incr($data);
-}
-
-sub remove_file {
- my ( $self, $data ) = @_;
- $self->usage_decr( $data );
- my $k = _file_key $data;
- $self->{files}->delete( $k );
-}
-
-sub make_dir {
- my ( $self, $data ) = @_;
-
-}
-
-sub new_link {
- my ( $self, $data ) = @_;
-
- warn "# new_link ",dump $data;
-
- if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
- my ( $dir, $to, $name ) = ( $1, $2, $3 );
- my $path = "users/$data->{login}/blob/" . $data->{file};
- my $link_to = readlink $path;
- warn "$link_to";
- if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
-
- my $s = $path;
- $s =~ s{/[^/]+$}{}; # strip filename
- while ( $link_to =~ s{/../}{/} ) {
- $s =~ s{/[^/]+$}{} || die "can't strip $s";
- warn "## simplify $s $link_to\n";
- }
- $s .= $link_to;
-
- my $d = "users/$to/blob";
- if ( ! -e $d ) {
- warn "ERROR: no to user $to in $d";
- return;
- }
- $d .= "/$name";
-
- # $name can contain directories so we must create them
- my $to_dir = $d;
- $to_dir =~ s{/[^/]+$}{};
- make_path $to_dir if ! -e $to_dir;
-
- if ( ! -e $s ) {
- warn "ERROR: can't find source $s";
- } else {
-
- warn "link $s -> $d\n";
- link $s, $d;
-
- my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
-
- my $origin = $self->file_get({
- login => $l,
- file => $f,
- });
- $self->new_file($origin);
- warn "INFO: sent file ",dump($origin);
- }
-
-
- } else {
- warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
- }
- }
-}
-
-sub transfer {
- my ( $self,$data ) = @_;
-
- my $blob = "users/$data->{login}/blob";
- my $path = "$blob/$data->{file}";
-
- if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
- my $type = $1;
-
- if ( $type eq 'f' ) {
- $self->modify_file( $data );
- $self->dedup( $data, $path );
- } elsif ( $type eq 'd' ) {
- $self->make_dir( $data );
- } elsif ( $type eq 'L' ) {
- $self->new_link( $data );
- } else {
- die "unknown type $type ", dump $data;
- }
- } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
- $self->remove_file($data);
- }
- return $data;
-}
-
-sub md5pool {
- my ( $path, $md5 ) = @_;
-
- my $pool = 'md5'; # FIXME sharding?
- mkdir $pool unless -e $pool;
-
- if ( -e "$pool/$md5" ) {
- warn "dedup hit $md5 $path\n";
- my $dedup = $path . '.dedup';
- rename $path, $dedup;
- link "$pool/$md5", $path;
- unlink $dedup;
- } else {
- link $path, "$pool/$md5";
- }
-}
-
-my $empty_md5 = " " x 32;
-
-sub dedup {
- my ( $self, $data, $path ) = @_;
-
- if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
- my $dir = $1;
- my $imported = 0;
- warn "IMPORT ", $data->{file}, "\n";
- open(my $md5sum, '<', $path);
- while(<$md5sum>) {
- chomp;
- my ( $md5, $file ) = split(/\s+/,$_,2);
- if ( ! -e "md5/$md5" ) {
- warn "MISSING $md5 $file\n";
- next;
- }
- my $new = "users/$data->{login}/blob/$dir$file";
- if ( ! -e $new ) {
- # create path from md5sum file
- my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
- make_path $only_dir unless -d $only_dir;
- $imported += link "md5/$md5", $new;
- my $fake = {
- login => $data->{login},
- host => $data->{host},
- file => $dir . $file,
- md5 => $md5,
- size => -s $new,
- };
- $self->new_file($fake);
- warn "import from $path ",dump($fake);
- } else {
- md5pool $new => $md5;
- }
- }
- print "INFO imported $imported files from ",dump($data);
- }
-
- if ( $data->{md5} ne $empty_md5 ) {
- md5pool $path => $data->{md5};
- } else {
-
- }
-}
-
-1;
--- /dev/null
+package CloudStore::Store;
+use warnings;
+use strict;
+
+use autodie;
+use JSON::XS;
+use File::Path qw(make_path);
+use File::Slurp qw();
+use Cache::Memcached;
+use Digest::MD5 qw(md5_base64);
+use Data::Dump qw(dump);
+use LWP::Simple;
+use Carp qw(confess);
+
+use WarnColor;
+
+my $buckets = {
+ users => 5800,
+ files => 5801,
+ session => 5802,
+};
+
+sub new {
+ my ($class) = @_;
+
+ my $self = {};
+ bless $self, $class;
+
+ foreach my $bucket ( keys %$buckets ) {
+ my $port = $buckets->{$bucket};
+ my $server = new Cache::Memcached {
+ 'servers' => [ "127.0.0.1:$port" ],
+ 'debug' => defined $ENV{DEBUG} && $ENV{DEBUG} > 3,
+ # 'compress_threshold' => 10_000,
+ };
+ #$server->set_servers($array_ref);
+ #$server->set_compress_threshold(10_000);
+ $server->enable_compress(0);
+ $self->{$bucket} = $server;
+
+ }
+
+ warn "# new ",dump $self if $ENV{DEBUG};
+
+ return $self;
+}
+
+sub json_set {
+ my ($self,$bucket,$key,$data) = @_;
+ confess "data not ref ",dump($data) unless ref $data;
+ my $json = encode_json $data;
+ $self->{$bucket}->set( $key => $json );
+ warn "## json_set $bucket $key $json\n";
+ return $json;
+}
+
+sub json_get {
+ my ($self,$bucket,$key,$data) = @_;
+ if ( my $json = $self->{$bucket}->get($key) ) {
+ warn "## json_get $bucket $key $json\n";
+ return decode_json $json;
+ }
+}
+
+sub user_set {
+ my ($self,$data) = @_;
+ $self->json_set( 'users', $data->{login}, $data );
+}
+
+sub user_get {
+ my ($self,$login) = @_;
+ $login = $login->{login} if ref $login;
+ my $user = $self->json_get( 'users', $login );
+ $user->{usage} = $self->usage( $login );
+ $user->{status} = $self->status( $login );
+ warn "## user ",dump($user) if $ENV{DEBUG};
+ return $user;
+}
+
+sub status {
+ my ($self,$login,$message) = @_;
+ $login = $login->{login} if ref $login;
+ if ( $message ) {
+ $self->{session}->set( "$login:status" => $message );
+ return $message;
+ } else {
+ $self->{session}->get( "$login:status" );
+ }
+}
+
+sub usage_decr {
+ my ($self,$data) = @_;
+ $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
+}
+
+sub usage_incr {
+ my ($self,$data) = @_;
+ $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
+}
+
+sub usage {
+ my ($self,$login) = @_;
+ $login = $login->{login} if ref $login;
+ $self->{session}->get( $login . ':usage' );
+}
+
+sub couchdb {
+ my $self = shift @_;
+ my $fmt = shift @_;
+ my $url = sprintf $fmt, @_;
+
+ warn "# couchdb $url\n";
+ if ( my $json = get $url ) {
+ warn "## $url $json\n";
+ my $r = decode_json $json;
+ return $r;
+ }
+}
+
+sub usage_init {
+ my ($self,$login) = @_;
+ $login = $login->{login} if ref $login;
+
+ my $usage = 0;
+
+ if ( my $r = $self->couchdb(
+ 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after'
+ , $login
+ , $login
+ )) {
+
+ $usage = $r->{rows}->[0]->{value};
+ $usage = 0 unless defined $usage;
+ }
+
+ $self->{session}->set( $login . ':usage' => $usage );
+}
+
+sub _file_key {
+ my $data = shift;
+ #md5_base64( $data->{login} . '/' . $data->{file} );
+ $data->{login} . ':' . $data->{file};
+}
+
+sub file_set {
+ my ($self,$data) = @_;
+ $self->json_set( 'files', _file_key($data), $data );
+}
+
+sub file_get {
+ my ($self,$data) = @_;
+ $self->json_get( 'files', _file_key($data) );
+}
+
+sub modify_file {
+ my ( $self,$data ) = @_;
+
+ if ( my $old = $self->file_get( $data ) ) {
+ $self->usage_decr( $data );
+ }
+
+ $self->new_file($data);
+}
+
+sub new_file {
+ my ( $self,$data ) = @_;
+ $self->file_set($data);
+ $self->usage_incr($data);
+}
+
+sub remove_file {
+ my ( $self, $data ) = @_;
+ $self->usage_decr( $data );
+ my $k = _file_key $data;
+ $self->{files}->delete( $k );
+}
+
+sub make_dir {
+ my ( $self, $data ) = @_;
+
+}
+
+sub new_link {
+ my ( $self, $data ) = @_;
+
+ warn "# new_link ",dump $data;
+
+ if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
+ my ( $dir, $to, $name ) = ( $1, $2, $3 );
+ my $path = "users/$data->{login}/blob/" . $data->{file};
+ my $link_to = readlink $path;
+ warn "$link_to";
+ if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
+
+ my $s = $path;
+ $s =~ s{/[^/]+$}{}; # strip filename
+ while ( $link_to =~ s{/../}{/} ) {
+ $s =~ s{/[^/]+$}{} || die "can't strip $s";
+ warn "## simplify $s $link_to\n";
+ }
+ $s .= $link_to;
+
+ my $d = "users/$to/blob";
+ if ( ! -e $d ) {
+ warn "ERROR: no to user $to in $d";
+ return;
+ }
+ $d .= "/$name";
+
+ # $name can contain directories so we must create them
+ my $to_dir = $d;
+ $to_dir =~ s{/[^/]+$}{};
+ make_path $to_dir if ! -e $to_dir;
+
+ if ( ! -e $s ) {
+ warn "ERROR: can't find source $s";
+ } else {
+
+ warn "link $s -> $d\n";
+ link $s, $d;
+
+ my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
+
+ my $origin = $self->file_get({
+ login => $l,
+ file => $f,
+ });
+ $self->new_file($origin);
+ warn "INFO: sent file ",dump($origin);
+ }
+
+
+ } else {
+ warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
+ }
+ }
+}
+
+sub transfer {
+ my ( $self,$data ) = @_;
+
+ my $blob = "users/$data->{login}/blob";
+ my $path = "$blob/$data->{file}";
+
+ if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
+ my $type = $1;
+
+ if ( $type eq 'f' ) {
+ $self->modify_file( $data );
+ $self->dedup( $data, $path );
+ } elsif ( $type eq 'd' ) {
+ $self->make_dir( $data );
+ } elsif ( $type eq 'L' ) {
+ $self->new_link( $data );
+ } else {
+ die "unknown type $type ", dump $data;
+ }
+ } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
+ $self->remove_file($data);
+ }
+ return $data;
+}
+
+sub md5pool {
+ my ( $path, $md5 ) = @_;
+
+ my $pool = 'md5'; # FIXME sharding?
+ mkdir $pool unless -e $pool;
+
+ if ( -e "$pool/$md5" ) {
+ warn "dedup hit $md5 $path\n";
+ my $dedup = $path . '.dedup';
+ rename $path, $dedup;
+ link "$pool/$md5", $path;
+ unlink $dedup;
+ } else {
+ link $path, "$pool/$md5";
+ }
+}
+
+my $empty_md5 = " " x 32;
+
+sub dedup {
+ my ( $self, $data, $path ) = @_;
+
+ if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
+ my $dir = $1;
+ my $imported = 0;
+ warn "IMPORT ", $data->{file}, "\n";
+ open(my $md5sum, '<', $path);
+ while(<$md5sum>) {
+ chomp;
+ my ( $md5, $file ) = split(/\s+/,$_,2);
+ if ( ! -e "md5/$md5" ) {
+ warn "MISSING $md5 $file\n";
+ next;
+ }
+ my $new = "users/$data->{login}/blob/$dir$file";
+ if ( ! -e $new ) {
+ # create path from md5sum file
+ my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
+ make_path $only_dir unless -d $only_dir;
+ $imported += link "md5/$md5", $new;
+ my $fake = {
+ login => $data->{login},
+ host => $data->{host},
+ file => $dir . $file,
+ md5 => $md5,
+ size => -s $new,
+ };
+ $self->new_file($fake);
+ warn "import from $path ",dump($fake);
+ } else {
+ md5pool $new => $md5;
+ }
+ }
+ print "INFO imported $imported files from ",dump($data);
+ }
+
+ if ( $data->{md5} ne $empty_md5 ) {
+ md5pool $path => $data->{md5};
+ } else {
+
+ }
+}
+
+1;
use lib 'lib';
use WarnColor;
-use CloudStore::Couchbase;
+use CloudStore::Store;
my $dir = $ENV{RSYNC_DIR} || '/srv/cloudstore';
my $port = $ENV{RSYNC_PORT} || 6501;
exit 1;
}
-my $store = CloudStore::Couchbase->new;
+my $store = CloudStore::Store->new;
mkdir "$dir/var" if ! -e "$dir/var";
use Data::Dump qw(dump);
use lib '/srv/cloudstore/lib';
-use CloudStore::Couchbase;
+use CloudStore::Store;
-my $store = CloudStore::Couchbase->new;
+my $store = CloudStore::Store->new;
my $login = $ENV{RSYNC_MODULE_NAME} || die "no RSYNC_MODULE_NAME";
use Data::Dump qw(dump);
use lib 'lib';
-use CloudStore::Couchbase;
+use CloudStore::Store;
my $login = $ARGV[0] || die "usage: $0 login\n";
-my $store = CloudStore::Couchbase->new;
+my $store = CloudStore::Store->new;
my $offset = 0;
my $limit = $ENV{LIMIT} || 10000;