use Digest::MD5 qw(md5_base64);
use Data::Dump qw(dump);
use Carp qw(confess);
-use BerkeleyDB;
+use TokyoCabinet;
use WarnColor;
my ( $self,$data ) = @_;
}
+sub mkbasedir {
+ my $dir = shift;
+ $dir =~ s{/[^/]+$}{}; # strip filename
+ mkdir $dir unless -e $dir;
+}
+
sub modify_file {
my ( $self,$data ) = @_;
-# if ( my $old = $self->file_get( $data ) ) {
-# $self->usage_decr( $data );
-# }
+ if ( $data->{file} =~ m{^(.*/).sync/send/([^/]+)$} ) {
+ my $from_dir = $1;
+ warn "SEND $2 from $from_dir\n";
+ my $sent_files;
+ open(my $send, '<', $self->blob_path($data) );
+ while(<$send>) {
+ s/[\n\r]+$//;
+
+ my ( $to, $file ) = split(/\s+/,$_,2);
+ my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
+ getpwnam $to;
+
+ my $from = $data;
+ $from->{file} = $from_dir . $file;
+ my $from_path = $self->blob_path($from);
+
+ if ( ! -r $from_path ) {
+ warn "ERROR: $from_path: $!";
+ next;
+ }
+
+ my $to_path = "$dir/received/$file";
+ mkbasedir $to_path;
+
+ warn "SEND $from_path -> $to_path\n";
+ unlink $to_path if -e $to_path; # FIXME why we need this?
+ $sent_files->{$to} += link $from_path, $to_path;
+ # FIXME cross-shard
+ my $md5 = $self->md5sum($data)->get( $from_dir . $file ) || warn "no md5 for $from_dir$file";
+ $self->md5sum({login => $to})->put( "/received/$file" => $md5 );
+ }
+
+ warn "SENT ",dump $sent_files;
+
+ return 0; # skip dedup
+ } elsif ( $data->{file} =~ m{^(.*/).sync/pending/([^/]+)$} ) {
+ my $from_dir = $1;
+ warn "PENDIG $2 from $from_dir";
+ open(my $pend, '<', $self->blob_path($data) );
+ while(<$pend>) {
+ s/[\n\r]+$//;
+warn $_;
+ if ( ! /^MOVED\#/ ) {
+ warn "skip $_\n";
+ next;
+ }
+
+ my ( undef, $from, $to ) = split(/\#/,$_,3);
+
+ my ( $from_path, $to_path ) = map {
+ my $tmp = $data;
+ $tmp->{file} = $from_dir . $_;
+ $self->blob_path($tmp);
+ } ( $from, $to );
+
+ warn "MV $from_path -> $to_path";
+ mkbasedir $to_path;
+ rename $from_path, $to_path;
+
+ my $md5 = $self->md5sum($data)->get( $from_dir . $from );
+ die "no md5sum $from_dir $from " unless $md5;
+
+ $self->md5sum($data)->out( $from_dir . $from );
+ $self->md5sum($data)->put( $from_dir . $to => $md5 );
- $self->new_file($data);
+ warn "$md5 moved to $from_dir $to";
+ }
+
+ return 0; # skip dedup
+ }
+
+ #return $file->{size} > 4096 ? 1 : 0; # FIXME
+ return 1; # dedup
}
+# never called by rsync directly!
sub new_file {
my ( $self,$data ) = @_;
# $self->file_set($data);
-# $self->usage_incr($data);
}
sub remove_file {
my ( $self, $data ) = @_;
-# $self->usage_decr( $data );
+
+ my $md5 = $self->md5sum($data)->get( $data->{file} );
+ return unless $md5; # directories don't have md5sums
+ my $path = $self->{md5pool} . '/' . $md5;
+ my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
+ $atime,$mtime,$ctime,$blksize,$blocks)
+ = stat($path);
+ if ( $nlink == 1 ) {
+ my $id = getpwnam 'md5';
+ chown $id,$gid, $path;
+ warn "# chown $id $gid $path";
+ }
}
sub make_dir {
# });
# $self->new_file($origin);
warn "INFO: sent file ",dump($l,$f);
- my $md5sum = $self->md5sum($data);
-
- my $md5 = $md5sum->{$s} || die "no md5 for $s";
- $md5sum->{$d} = $md5; # FIXME broken!
+ my $md5 = $self->md5sum($data)->get($s);
+ $self->md5sum({ login => $to })->put($d => $md5 );
+ # FIXME broken!
}
}
}
-our $md5_login;
sub md5sum {
my ( $self, $data ) = @_;
- if ( exists $md5_login->{$data->{login}} ) {
- return $md5_login->{$data->{login}};
- } elsif ( my $login = $data->{login} ) {
+ my $login = $data->{login} || confess "missing login in ",dump $data;
- my $md5_path = $self->{dir} || die "no dir?";
- $login =~ s/^u//;
- $md5_path .= "/$login/.md5.db";
+ return $self->{md5sum}->{$login} if exists $self->{md5sum}->{$login};
- my %md5;
- my $db = tie %md5, 'BerkeleyDB::Hash',
- -Filename => $md5_path,
- -Flags => DB_CREATE,
- ;
+ my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
+ getpwnam $login;
- return $md5_login->{$login} = \%md5;
- } else {
- confess "can't open md5sum";
- }
+ my $md5_path = "$dir/.md5";
+
+ my $db = TokyoCabinet::HDB->new();
+ $db->open($md5_path, $db->OWRITER | $db->OCREAT)
+ or die "can't open $md5_path: ",$db->errmsg( $db->ecode );
+
+ warn "open $md5_path";
+
+ $self->{md5sum}->{$login} = $db;
+ return $db;
}
sub rsync_log {
} elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
my $pid = $1;
- untie $md5_login->{ $self->{$pid}->{login} } && warn "untie $pid";
+
+ foreach my $login ( keys %{ $self->{md5sum} } ) {
+ $self->{md5sum}->{$login}->close;
+ warn "close md5sum $login";
+ }
+ delete $self->{md5sum};
+
delete $self->{pid}->{$pid};
warn "removed $pid";
+warn dump $self;
+
} else {
# warn "## rsync_log $data";
}
my $type = $1;
if ( $type eq 'f' ) {
- $self->modify_file( $data );
+ $self->modify_file( $data ) && # selective dedup
$self->dedup( $data, $path );
} elsif ( $type eq 'd' ) {
$self->make_dir( $data );
}
} elsif ( $data->{itemize} =~ m/\*deleting/ ) {
$self->remove_file($data);
+ } else {
+ warn "IGNORED ",dump($data);
}
return $data;
}
# FIXME fix perms?
} else {
link $path, "$pool/$md5";
+ warn "dedup +++ $md5 $path";
}
- my $md5sum = $self->md5sum($data);
- $md5sum->{ $data->{file} } = $md5;
+ $self->md5sum($data)->put( $data->{file} => $md5 );
}
my $empty_md5 = " " x 32;