use warnings;
use strict;
+use lib 'lib';
+use base qw(CloudStore::MD5sum);
+use CloudStore::API;
+
use autodie;
use JSON::XS;
use File::Path qw(make_path);
use File::Slurp qw();
-use Cache::Memcached;
use Digest::MD5 qw(md5_base64);
use Data::Dump qw(dump);
-use LWP::Simple;
use Carp qw(confess);
use WarnColor;
-my $buckets = {
- users => 5800,
- files => 5801,
- session => 5802,
-};
-
sub new {
- my ($class) = @_;
+ my ($class,$group) = @_;
- my $self = {};
+ my $self = {
+ api => CloudStore::API->new( $group ),
+ };
bless $self, $class;
- foreach my $bucket ( keys %$buckets ) {
- my $port = $buckets->{$bucket};
- my $server = new Cache::Memcached {
- 'servers' => [ "127.0.0.1:$port" ],
- 'debug' => defined $ENV{DEBUG} && $ENV{DEBUG} > 3,
- # 'compress_threshold' => 10_000,
- };
- #$server->set_servers($array_ref);
- #$server->set_compress_threshold(10_000);
- $server->enable_compress(0);
- $self->{$bucket} = $server;
-
- }
+ $self->{md5pool} = $self->{api}->{md5}->{dir};
warn "# new ",dump $self if $ENV{DEBUG};
return $self;
}
-sub json_set {
- my ($self,$bucket,$key,$data) = @_;
- confess "data not ref ",dump($data) unless ref $data;
- my $json = encode_json $data;
- $self->{$bucket}->set( $key => $json );
- warn "## json_set $bucket $key $json\n";
- return $json;
-}
+sub api { $_[0]->{api} }
-sub json_get {
- my ($self,$bucket,$key,$data) = @_;
- if ( my $json = $self->{$bucket}->get($key) ) {
- warn "## json_get $bucket $key $json\n";
- return decode_json $json;
- }
+sub mkbasedir {
+ my $dir = shift;
+ $dir =~ s{/[^/]+$}{}; # strip filename
+ mkdir $dir unless -e $dir;
}
-sub user_set {
- my ($self,$data) = @_;
- $self->json_set( 'users', $data->{login}, $data );
-}
+sub modify_file {
+ my ( $self,$data ) = @_;
-sub user_get {
- my ($self,$login) = @_;
- $login = $login->{login} if ref $login;
- my $user = $self->json_get( 'users', $login );
- $user->{usage} = $self->usage( $login );
- $user->{status} = $self->status( $login );
- warn "## user ",dump($user) if $ENV{DEBUG};
- return $user;
-}
+=for removed
-sub status {
- my ($self,$login,$message) = @_;
- $login = $login->{login} if ref $login;
- if ( $message ) {
- $self->{session}->set( "$login:status" => $message );
- return $message;
- } else {
- $self->{session}->get( "$login:status" );
- }
-}
+ if ( $data->{file} =~ m{^(.*/)?.sync/send/([^/]+)$} ) {
+ my $from_dir = $1;
+ warn "SEND $2 from $from_dir\n";
+ my $sent_files;
+ open(my $send, '<', $self->blob_path($data) );
+ while(<$send>) {
+ s/[\n\r]+$//;
-sub usage_decr {
- my ($self,$data) = @_;
- $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
-}
+ my ( $to, $file ) = split(/\s+/,$_,2);
+ my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
+ getpwnam $to;
-sub usage_incr {
- my ($self,$data) = @_;
- $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
-}
+ my $from = $data;
+ $from->{file} = $from_dir . $file;
+ my $from_path = $self->blob_path($from);
-sub usage {
- my ($self,$login) = @_;
- $login = $login->{login} if ref $login;
- $self->{session}->get( $login . ':usage' );
-}
+ if ( ! -r $from_path ) {
+ warn "ERROR: $from_path: $!";
+ next;
+ }
-sub couchdb {
- my $self = shift @_;
- my $fmt = shift @_;
- my $url = sprintf $fmt, @_;
-
- warn "# couchdb $url\n";
- if ( my $json = get $url ) {
- warn "## $url $json\n";
- my $r = decode_json $json;
- return $r;
- }
-}
+ my $to_path = "$dir/received/$file";
+ mkbasedir $to_path;
-sub usage_init {
- my ($self,$login) = @_;
- $login = $login->{login} if ref $login;
+ warn "SEND $from_path -> $to_path\n";
+ unlink $to_path if -e $to_path; # FIXME why we need this?
+ $sent_files->{$to} += link $from_path, $to_path;
+ # FIXME cross-shard
+ my $md5 = $self->md5sum($data)->get( $from_dir . $file ) || warn "no md5 for $from_dir$file";
+ $self->md5sum({login => $to})->put( "/received/$file" => $md5 );
+ }
- my $usage = 0;
+ warn "SENT ",dump $sent_files;
+
+ return 0; # skip dedup
+ } elsif ( $data->{file} =~ m{^(.*/)?.sync/pending/([^/]+)$} ) {
+ my $from_dir = $1;
+ warn "PENDIG $2 from $from_dir";
+ open(my $pend, '<', $self->blob_path($data) );
+ while(<$pend>) {
+ s/[\n\r]+$//;
+
+ if ( m/^DELETED\#(.+)$/ ) {
+ my $path = $self->blob_path($data => $from_dir . $1 );
+ if ( -e $path ) {
+ warn "UNLINK $path";
+ -d $path ? rmdir $path : unlink $path || warn "ERROR: unlink $path $!";
+ next;
+ } else {
+ warn "MISSING $path to unlink";
+ next;
+ }
+ } elsif ( ! /^(MOVED|RENAMED)\#/ ) {
+ warn "skip $_\n";
+ next;
+ }
- if ( my $r = $self->couchdb(
- 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after'
- , $login
- , $login
- )) {
+ my ( undef, $from, $to ) = split(/\#/,$_,3);
- $usage = $r->{rows}->[0]->{value};
- $usage = 0 unless defined $usage;
- }
+ my ( $from_path, $to_path ) = map {
+ my $tmp = $data;
+ $tmp->{file} = $from_dir . $_;
+ $self->blob_path($tmp);
+ } ( $from, $to );
- $self->{session}->set( $login . ':usage' => $usage );
-}
+ if ( ! -e $from_path ) {
+ warn "SKIPPED $from_path: $!";
+ next;
+ }
-sub _file_key {
- my $data = shift;
- #md5_base64( $data->{login} . '/' . $data->{file} );
- $data->{login} . ':' . $data->{file};
-}
+ warn "MV $from_path -> $to_path";
+ mkbasedir $to_path;
+ rename $from_path, $to_path;
+
+ my $md5 = $self->md5sum($data)->get( $from_dir . $from );
+ if ( ! $md5 ) {
+ warn "ERROR: no md5sum $from_dir $from " unless $md5;
+ next;
+ }
-sub file_set {
- my ($self,$data) = @_;
- $self->json_set( 'files', _file_key($data), $data );
-}
+ $self->md5sum($data)->out( $from_dir . $from );
+ $self->md5sum($data)->put( $from_dir . $to => $md5 );
+ $self->md5sum_close($data);
-sub file_get {
- my ($self,$data) = @_;
- $self->json_get( 'files', _file_key($data) );
-}
+ warn "$md5 moved to $from_dir $to";
+ }
-sub modify_file {
- my ( $self,$data ) = @_;
+ return 0; # skip dedup
+ }
+
+=cut
- if ( my $old = $self->file_get( $data ) ) {
- $self->usage_decr( $data );
+ if ( $data->{file} =~ m{^(.*/)?.sync/} ) {
+ # ignore .sync/ files from client
+ return 0;
}
- $self->new_file($data);
+ #return $file->{size} > 4096 ? 1 : 0; # FIXME
+ return 1; # dedup
}
+# never called by rsync directly!
sub new_file {
my ( $self,$data ) = @_;
- $self->file_set($data);
- $self->usage_incr($data);
+# $self->file_set($data);
}
-sub remove_file {
+# client doesn't issue --delete
+sub removed_file {
my ( $self, $data ) = @_;
- $self->usage_decr( $data );
- my $k = _file_key $data;
- $self->{files}->delete( $k );
+
+ my $md5 = $self->md5_get( $data );
+ return unless $md5; # directories don't have md5sums
+ my $path = $self->{md5pool} . '/' . $md5;
+ my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
+ $atime,$mtime,$ctime,$blksize,$blocks)
+ = stat($path);
+
+ my $user = $self->{api}->user_info($data->{login});
+
+ if ( $nlink == 1 && $uid == $user->{uid} ) {
+ $self->append( $user, 'removed', -$size, $uid, $data->{file} );
+ my $id = getpwnam 'md5';
+ chown $id,$gid, $path;
+ warn "# chown $id $gid $path";
+ }
+
+ $self->api->append_meta('md5sum', $user, 'delete', $data->{file} );
}
sub make_dir {
warn "# new_link ",dump $data;
+=for removed
+
if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
my ( $dir, $to, $name ) = ( $1, $2, $3 );
- my $path = "users/$data->{login}/blob/" . $data->{file};
+ my $path = $self->blob_path($data);
my $link_to = readlink $path;
warn "$link_to";
if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
}
$s .= $link_to;
- my $d = "users/$to/blob";
- if ( ! -e $d ) {
- warn "ERROR: no to user $to in $d";
- return;
- }
- $d .= "/$name";
+ my $d = $self->blob_path({
+ pid => $data->{pid},
+ file => $name
+ });
# $name can contain directories so we must create them
- my $to_dir = $d;
- $to_dir =~ s{/[^/]+$}{};
- make_path $to_dir if ! -e $to_dir;
+ mkbasedir $d;
if ( ! -e $s ) {
warn "ERROR: can't find source $s";
my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
- my $origin = $self->file_get({
- login => $l,
- file => $f,
- });
- $self->new_file($origin);
- warn "INFO: sent file ",dump($origin);
+# my $origin = $self->file_get({
+# login => $l,
+# file => $f,
+# });
+# $self->new_file($origin);
+ warn "INFO: sent file ",dump($l,$f);
+ my $md5 = $self->md5sum($data)->get($s);
+ $self->md5sum({ login => $to })->put($d => $md5 );
+ # FIXME broken!
}
warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
}
}
+
+=cut
+
+}
+
+sub init_pid_login {
+ my ( $self, $pid, $login ) = @_;
+
+ $login =~ s/\@.+//;
+ $self->{pid}->{$pid} = $self->{api}->user_info($login);
+
+ warn "created $pid";
+}
+
+sub cleanup_pid {
+ my ( $self, $pid ) = @_;
+
+ delete $self->{pid}->{$pid};
+ warn "removed $pid";
+}
+
+sub rsync_log {
+ my ( $self, $data ) = @_;
+ if ( $data =~ m/\[(\d+)\] rsync \w+ (\S+) \w+ (\S+)/ ) {
+ my ( $pid, $module, $login ) = ( $1, $2, $3 );
+ $self->init_pid_login( $pid, $login );
+ } elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
+ my $pid = $1;
+ $self->cleanup_pid( $pid );
+ } else {
+ warn "## rsync_log $data";
+ }
+}
+
+sub blob_path {
+ my ( $self, $data, $path ) = @_;
+ my $blob = $self->{pid}->{ $data->{pid} }->{dir};
+ if ( ! $blob ) {
+ warn "ERROR: $data->{pid} not found, possible restart?";
+ $self->init_pid_login( $data->{pid}, $data->{login} );
+ $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for ", dump( $self->{pid}->{ $data->{pid} } );
+ }
+ $blob .= '/' . ( defined $path ? $path : $data->{file} );
+ return $blob;
}
-sub transfer {
+
+sub rsync_transfer {
my ( $self,$data ) = @_;
- my $blob = "users/$data->{login}/blob";
- my $path = "$blob/$data->{file}";
+ my $path = $self->blob_path($data);
if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
my $type = $1;
if ( $type eq 'f' ) {
- $self->modify_file( $data );
+ $self->modify_file( $data ) && # selective dedup
$self->dedup( $data, $path );
} elsif ( $type eq 'd' ) {
$self->make_dir( $data );
die "unknown type $type ", dump $data;
}
} elsif ( $data->{itemize} =~ m/\*deleting/ ) {
- $self->remove_file($data);
+ $self->removed_file($data);
+ } else {
+ warn "IGNORED ",dump($data) if $ENV{DEBUG};
}
return $data;
}
+sub append {
+ my $self = shift @_;
+ $self->{api}->append( @_ );
+}
+
sub md5pool {
- my ( $path, $md5 ) = @_;
+ my ( $self, $data ) = @_;
- my $pool = 'md5'; # FIXME sharding?
+ my $pool = $self->{md5pool} || die "no md5pool in ",dump $self;
mkdir $pool unless -e $pool;
- if ( -e "$pool/$md5" ) {
+ my $md5 = $data->{md5} || die "no md5 in ",dump $data;
+ my $path = $self->blob_path($data);
+
+ if ( ! -e $path ) {
+ warn "ERROR missing path $path";
+ return;
+ }
+
+ my $pool_md5 = "$pool/$md5";
+
+ if ( -e $pool_md5 ) {
warn "dedup hit $md5 $path\n";
+
+ my ($pool_uid,$pool_size) = (stat($pool_md5))[4,7];
+ my $user = $self->{api}->user_info( $data->{login} );
+
+ if ( $pool_uid != $user->{uid} ) {
+ if ( $pool_uid != $self->{api}->{md5}->{uid} ) {
+ chown $self->{api}->{md5}->{uid}, $self->{api}->{md5}->{gid}, $pool_md5;
+ chmod oct("0444"), $pool_md5;
+ my $steal_user = $self->{api}->user_info( $pool_uid );
+ $self->append( $steal_user, 'dedup-steal', $pool_size, $pool_uid, $data->{file} );
+ }
+ $self->append( $user, 'dedup', $pool_size, $pool_uid, $data->{file} );
+ }
+
my $dedup = $path . '.dedup';
rename $path, $dedup;
link "$pool/$md5", $path;
unlink $dedup;
} else {
link $path, "$pool/$md5";
+ warn "dedup +++ $md5 $path";
}
+
+ $self->md5_set( $path => $md5 );
+
+ $self->api->append_meta('md5sum', $data->{login}, $md5, $data->{file} );
}
my $empty_md5 = " " x 32;
my ( $self, $data, $path ) = @_;
if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
- my $dir = $1;
+ my $dir = $1 || '';
my $imported = 0;
warn "IMPORT ", $data->{file}, "\n";
open(my $md5sum, '<', $path);
while(<$md5sum>) {
chomp;
my ( $md5, $file ) = split(/\s+/,$_,2);
- if ( ! -e "md5/$md5" ) {
+ if ( ! -e "$self->{md5pool}/$md5" ) {
warn "MISSING $md5 $file\n";
next;
}
- my $new = "users/$data->{login}/blob/$dir$file";
- if ( ! -e $new ) {
- # create path from md5sum file
- my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
- make_path $only_dir unless -d $only_dir;
- $imported += link "md5/$md5", $new;
- my $fake = {
- login => $data->{login},
- host => $data->{host},
- file => $dir . $file,
- md5 => $md5,
- size => -s $new,
- };
- $self->new_file($fake);
- warn "import from $path ",dump($fake);
+ my $new = {
+ login => $data->{login},
+ pid => $data->{pid},
+ file => "$dir$file",
+ md5 => $md5,
+ };
+ my $new_path = $self->blob_path($new);
+ if ( ! -e $new_path ) {
+ $self->{api}->send_file( 'md5' => $md5, $data->{login}, "$dir$file" );
+ $self->md5pool( $new );
} else {
- md5pool $new => $md5;
+ $self->md5pool( $new );
}
}
- print "INFO imported $imported files from ",dump($data);
+ warn "INFO imported $imported files from ",dump($data);
+
+ return; # don't put md5sum files into pool
}
if ( $data->{md5} ne $empty_md5 ) {
- md5pool $path => $data->{md5};
+ $self->md5pool( $data );
} else {
-
+ warn "empty md5", dump $data;
}
}