use warnings;
use strict;
+use lib 'lib';
+use base qw(CloudStore::MD5sum);
+use CloudStore::API;
+
use autodie;
use JSON::XS;
use File::Path qw(make_path);
use Digest::MD5 qw(md5_base64);
use Data::Dump qw(dump);
use Carp qw(confess);
-use TokyoCabinet;
use WarnColor;
sub new {
- my $class = shift;
+ my ($class,$group) = @_;
- my $self = {@_};
+ my $self = {
+ api => CloudStore::API->new( $group ),
+ };
bless $self, $class;
- die "no dir" unless $self->{dir};
- $self->{md5pool} = $self->{dir} . '/md5';
+ $self->{md5pool} = $self->{api}->{md5}->{dir};
warn "# new ",dump $self if $ENV{DEBUG};
return $self;
}
-sub user_set {
- my ( $self,$data ) = @_;
-}
-
-sub user_get {
- my ( $self,$data ) = @_;
-}
+sub api { $_[0]->{api} }
sub mkbasedir {
my $dir = shift;
sub modify_file {
my ( $self,$data ) = @_;
+=for removed
+
if ( $data->{file} =~ m{^(.*/)?.sync/send/([^/]+)$} ) {
my $from_dir = $1;
warn "SEND $2 from $from_dir\n";
s/[\n\r]+$//;
if ( m/^DELETED\#(.+)$/ ) {
- my $path = $from_dir . $1;
+ my $path = $self->blob_path($data => $from_dir . $1 );
if ( -e $path ) {
- unlink $path && warn "unlink $path\n";
+ warn "UNLINK $path";
+ -d $path ? rmdir $path : unlink $path || warn "ERROR: unlink $path $!";
+ next;
} else {
- warn "MISSING $path to unlink\n";
+ warn "MISSING $path to unlink";
next;
}
} elsif ( ! /^(MOVED|RENAMED)\#/ ) {
$self->md5sum($data)->out( $from_dir . $from );
$self->md5sum($data)->put( $from_dir . $to => $md5 );
+ $self->md5sum_close($data);
warn "$md5 moved to $from_dir $to";
}
return 0; # skip dedup
}
+=cut
+
+ if ( $data->{file} =~ m{^(.*/)?.sync/} ) {
+ # ignore .sync/ files from client
+ return 0;
+ }
+
#return $file->{size} > 4096 ? 1 : 0; # FIXME
return 1; # dedup
}
# $self->file_set($data);
}
-sub remove_file {
+# client doesn't issue --delete
+sub removed_file {
my ( $self, $data ) = @_;
- my $md5 = $self->md5sum($data)->get( $data->{file} );
+=for removed
+ my $path = $self->blob_path( $data );
+ my $md5 = $self->md5_get( $path );
return unless $md5; # directories don't have md5sums
my $path = $self->{md5pool} . '/' . $md5;
my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
$atime,$mtime,$ctime,$blksize,$blocks)
= stat($path);
- if ( $nlink == 1 ) {
+
+ my $user = $self->{api}->user_info($data->{login});
+
+ if ( $nlink == 1 && $uid == $user->{uid} ) {
+ $self->append( $user, 'removed', -$size, $uid, $data->{file} );
my $id = getpwnam 'md5';
chown $id,$gid, $path;
warn "# chown $id $gid $path";
}
- $self->md5sum($data)->out( $data->{file} );
+ $self->api->append_meta('md5sum', $user, 'delete', $data->{file} );
+=cut
+
}
sub make_dir {
warn "# new_link ",dump $data;
+=for removed
+
if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
my ( $dir, $to, $name ) = ( $1, $2, $3 );
my $path = $self->blob_path($data);
});
# $name can contain directories so we must create them
- my $to_dir = $d;
- $to_dir =~ s{/[^/]+$}{};
- make_path $to_dir if ! -e $to_dir;
+ mkbasedir $d;
if ( ! -e $s ) {
warn "ERROR: can't find source $s";
warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
}
}
-}
-
-sub md5sum {
- my ( $self, $data ) = @_;
- my $login = $data->{login} || confess "missing login in ",dump $data;
+=cut
- return $self->{md5sum}->{$login} if exists $self->{md5sum}->{$login};
+}
- my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
- getpwnam $login;
+sub init_pid_login {
+ my ( $self, $pid, $login ) = @_;
- my $md5_path = "$dir/.md5";
+ $login =~ s/\@.+//;
+ $self->{pid}->{$pid} = $self->{api}->user_info($login);
- my $db = TokyoCabinet::HDB->new();
- $db->open($md5_path, $db->OWRITER | $db->OCREAT)
- or die "can't open $md5_path: ",$db->errmsg( $db->ecode );
+ warn "created $pid";
+}
- warn "open $md5_path";
+sub cleanup_pid {
+ my ( $self, $pid ) = @_;
- $self->{md5sum}->{$login} = $db;
- return $db;
+ delete $self->{pid}->{$pid};
+ warn "removed $pid";
}
sub rsync_log {
my ( $self, $data ) = @_;
if ( $data =~ m/\[(\d+)\] rsync \w+ (\S+) \w+ (\S+)/ ) {
my ( $pid, $module, $login ) = ( $1, $2, $3 );
-
- $login =~ s/\@.+//;
- my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
- getpwnam $login;
-
- $self->{pid}->{$pid} = {
- login => $login,
- uid => $uid,
- gid => $gid,
- email => $email,
- dir => $dir,
- shell => $shell,
- };
-
- warn "created $pid";
-
+ $self->init_pid_login( $pid, $login );
} elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
my $pid = $1;
-
- foreach my $login ( keys %{ $self->{md5sum} } ) {
- $self->{md5sum}->{$login}->close;
- warn "close md5sum $login";
- }
- delete $self->{md5sum};
-
- delete $self->{pid}->{$pid};
- warn "removed $pid";
-warn dump $self;
-
+ $self->cleanup_pid( $pid );
} else {
-# warn "## rsync_log $data";
+ warn "## rsync_log $data" if $ENV{DEBUG};
}
}
sub blob_path {
- my ( $self, $data ) = @_;
- my $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for $data->{pid} in ",dump( $self->{pid} );
- $blob .= '/' . $data->{file};
+ my ( $self, $data, $path ) = @_;
+ my $blob = $self->{pid}->{ $data->{pid} }->{dir};
+ if ( ! $blob ) {
+ warn "ERROR: $data->{pid} not found, possible restart?";
+ $self->init_pid_login( $data->{pid}, $data->{login} );
+ $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for ", dump( $self->{pid}->{ $data->{pid} } );
+ }
+ $blob .= '/' . ( defined $path ? $path : $data->{file} );
return $blob;
}
} else {
die "unknown type $type ", dump $data;
}
+ $self->api->refresh_file_list( $data->{login} );
} elsif ( $data->{itemize} =~ m/\*deleting/ ) {
- $self->remove_file($data);
+ $self->removed_file($data);
+ $self->api->refresh_file_list( $data->{login} );
} else {
warn "IGNORED ",dump($data) if $ENV{DEBUG};
}
return $data;
}
+sub append {
+ my $self = shift @_;
+ $self->{api}->append( @_ );
+}
+
sub md5pool {
my ( $self, $data ) = @_;
my $md5 = $data->{md5} || die "no md5 in ",dump $data;
my $path = $self->blob_path($data);
- if ( -e "$pool/$md5" ) {
+ if ( ! -e $path ) {
+ warn "ERROR missing path $path";
+ return;
+ }
+
+ my $pool_md5 = "$pool/$md5";
+
+ if ( -e $pool_md5 ) {
warn "dedup hit $md5 $path\n";
+
+ my ($pool_uid,$pool_size) = (stat($pool_md5))[4,7];
+ my $user = $self->{api}->user_info( $data->{login} );
+
+ if ( $pool_uid != $user->{uid} ) {
+ if ( $pool_uid != $self->{api}->{md5}->{uid} ) {
+ chown $self->{api}->{md5}->{uid}, $self->{api}->{md5}->{gid}, $pool_md5;
+ chmod oct("0444"), $pool_md5;
+ my $steal_user = $self->{api}->user_info( $pool_uid );
+ $self->append( $steal_user, 'dedup-steal', $pool_size, $pool_uid, $data->{file} );
+ }
+ $self->append( $user, 'dedup', $pool_size, $pool_uid, $data->{file} );
+ }
+
my $dedup = $path . '.dedup';
rename $path, $dedup;
link "$pool/$md5", $path;
unlink $dedup;
- # FIXME fix perms?
} else {
link $path, "$pool/$md5";
warn "dedup +++ $md5 $path";
}
- $self->md5sum($data)->put( $data->{file} => $md5 );
+ $self->md5_set( $path => $md5 );
+
+ $self->api->append_meta('md5sum', $data->{login}, $md5, $data->{file} );
}
my $empty_md5 = " " x 32;
my ( $self, $data, $path ) = @_;
if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
- my $dir = $1;
+ my $dir = $1 || '';
my $imported = 0;
warn "IMPORT ", $data->{file}, "\n";
open(my $md5sum, '<', $path);
while(<$md5sum>) {
chomp;
my ( $md5, $file ) = split(/\s+/,$_,2);
- if ( ! -e "$self->{md5path}/$md5" ) {
+ if ( ! $file ) {
+ warn "IGNORE $md5 without file\n";
+ next;
+ }
+ if ( ! -e "$self->{md5pool}/$md5" ) {
warn "MISSING $md5 $file\n";
next;
}
my $new = {
+ login => $data->{login},
pid => $data->{pid},
file => "$dir$file",
md5 => $md5,
};
my $new_path = $self->blob_path($new);
if ( ! -e $new_path ) {
- # create path from md5sum file
- my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
- make_path $only_dir unless -d $only_dir;
- $imported += link "$self->{md5path}/$md5", $new_path;
- $self->new_file($new);
- warn "import from $path ",dump($new);
+ $self->{api}->send_file( 'md5' => $md5, $data->{login}, "$dir$file" );
$self->md5pool( $new );
} else {
$self->md5pool( $new );
}
}
- print "INFO imported $imported files from ",dump($data);
+ warn "INFO imported $imported files from ",dump($data);
+
+ return; # don't put md5sum files into pool
}
if ( $data->{md5} ne $empty_md5 ) {