port md5 support to TokyoCabinet
[cloudstore.git] / lib / CloudStore / Store.pm
index f49785b..3560ac2 100644 (file)
@@ -9,7 +9,7 @@ use File::Slurp qw();
 use Digest::MD5 qw(md5_base64);
 use Data::Dump qw(dump);
 use Carp qw(confess);
-use BerkeleyDB;
+use TokyoCabinet;
 
 use WarnColor;
 
@@ -35,25 +35,110 @@ sub user_get {
        my ( $self,$data ) = @_;
 }
 
+sub mkbasedir {
+       my $dir = shift;
+       $dir =~ s{/[^/]+$}{}; # strip filename
+       mkdir $dir unless -e $dir;
+}
+
 sub modify_file {
        my ( $self,$data ) = @_;
 
-#      if ( my $old = $self->file_get( $data ) ) {
-#              $self->usage_decr( $data );
-#      }
+       if ( $data->{file} =~ m{^(.*/).sync/send/([^/]+)$} ) {
+               my $from_dir = $1;
+               warn "SEND $2 from $from_dir\n";
+               my $sent_files;
+               open(my $send, '<', $self->blob_path($data) );
+               while(<$send>) {
+                       s/[\n\r]+$//;
+
+                       my ( $to, $file ) = split(/\s+/,$_,2);
+                       my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
+                               getpwnam $to;
+
+                       my $from = $data;
+                       $from->{file} = $from_dir . $file;
+                       my $from_path = $self->blob_path($from);
+
+                       if ( ! -r $from_path ) {
+                               warn "ERROR: $from_path: $!";
+                               next;
+                       }
+
+                       my $to_path = "$dir/received/$file";
+                       mkbasedir $to_path;
+
+                       warn "SEND $from_path -> $to_path\n";
+                       unlink $to_path if -e $to_path; # FIXME why we need this?
+                       $sent_files->{$to} += link $from_path, $to_path;
+                       # FIXME cross-shard
+                       my $md5 = $self->md5sum($data)->get( $from_dir . $file ) || warn "no md5 for $from_dir$file";
+                       $self->md5sum({login => $to})->put( "/received/$file" => $md5 );
+               }
+
+               warn "SENT ",dump $sent_files;
+
+               return 0; # skip dedup
+       } elsif ( $data->{file} =~ m{^(.*/).sync/pending/([^/]+)$} ) {
+               my $from_dir = $1;
+               warn "PENDIG $2 from $from_dir";
+               open(my $pend, '<', $self->blob_path($data) );
+               while(<$pend>) {
+                       s/[\n\r]+$//;
+warn $_;
+                       if ( ! /^MOVED\#/ ) {
+                               warn "skip $_\n";
+                               next;
+                       }
+
+                       my ( undef, $from, $to ) = split(/\#/,$_,3);
+
+                       my ( $from_path, $to_path ) = map {
+                               my $tmp = $data;
+                               $tmp->{file} = $from_dir . $_;
+                               $self->blob_path($tmp);
+                       } ( $from, $to );
+
+                       warn "MV $from_path -> $to_path";
+                       mkbasedir $to_path;
+                       rename $from_path, $to_path;
+       
+                       my $md5 = $self->md5sum($data)->get( $from_dir . $from );
+                       die "no md5sum $from_dir $from " unless $md5;
+
+                       $self->md5sum($data)->out( $from_dir . $from );
+                       $self->md5sum($data)->put( $from_dir . $to => $md5 );
 
-       $self->new_file($data);
+                       warn "$md5 moved to $from_dir $to";
+               }
+
+               return 0; # skip dedup
+       }
+
+       #return $file->{size} > 4096 ? 1 : 0; # FIXME
+       return 1; # dedup
 }
 
+# never called by rsync directly!
 sub new_file {
        my ( $self,$data ) = @_;
 #      $self->file_set($data);
-#      $self->usage_incr($data);
 }
 
 sub remove_file {
        my ( $self, $data ) = @_;
-#      $self->usage_decr( $data );
+
+       my $md5 = $self->md5sum($data)->get( $data->{file} );
+       return unless $md5; # directories don't have md5sums
+       my $path = $self->{md5pool} . '/' . $md5;
+       my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
+               $atime,$mtime,$ctime,$blksize,$blocks)
+                       = stat($path);
+       if ( $nlink == 1 ) {
+               my $id = getpwnam 'md5';
+               chown $id,$gid, $path;
+               warn "# chown $id $gid $path";
+       }
 }
 
 sub make_dir {
@@ -106,10 +191,9 @@ sub new_link {
 #                              });
 #                              $self->new_file($origin);
                                warn "INFO: sent file ",dump($l,$f);
-                               my $md5sum = $self->md5sum($data);
-
-                               my $md5 = $md5sum->{$s} || die "no md5 for $s";
-                               $md5sum->{$d} = $md5; # FIXME broken!
+                               my $md5 = $self->md5sum($data)->get($s);
+                               $self->md5sum({ login => $to })->put($d => $md5 );
+                               # FIXME broken!
                        }
 
 
@@ -119,28 +203,26 @@ sub new_link {
        }
 }
 
-our $md5_login;
 sub md5sum {
        my ( $self, $data ) = @_;
 
-       if ( exists $md5_login->{$data->{login}} ) {
-               return $md5_login->{$data->{login}};
-       } elsif ( my $login = $data->{login} ) {
+       my $login = $data->{login} || confess "missing login in ",dump $data;
 
-               my $md5_path = $self->{dir} || die "no dir?";
-               $login =~ s/^u//;
-               $md5_path .= "/$login/.md5.db";
+       return $self->{md5sum}->{$login} if exists $self->{md5sum}->{$login};
 
-               my %md5;
-               my $db = tie %md5, 'BerkeleyDB::Hash',
-                       -Filename => $md5_path,
-                       -Flags => DB_CREATE,
-               ;
+       my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
+               getpwnam $login;
 
-               return $md5_login->{$login} = \%md5;
-       } else {
-               confess "can't open md5sum";
-       }
+       my $md5_path = "$dir/.md5";
+
+       my $db = TokyoCabinet::HDB->new();
+       $db->open($md5_path, $db->OWRITER | $db->OCREAT)
+       or die "can't open $md5_path: ",$db->errmsg( $db->ecode );
+
+       warn "open $md5_path";
+
+       $self->{md5sum}->{$login} = $db;
+       return $db;
 }
 
 sub rsync_log {
@@ -165,9 +247,17 @@ sub rsync_log {
 
        } elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
                my $pid = $1;
-               untie $md5_login->{ $self->{$pid}->{login} } && warn "untie $pid";
+
+               foreach my $login ( keys %{ $self->{md5sum} } ) {
+                       $self->{md5sum}->{$login}->close;
+                       warn "close md5sum $login";
+               }
+               delete $self->{md5sum};
+
                delete $self->{pid}->{$pid};
                warn "removed $pid";
+warn dump $self;
+
        } else {
 #              warn "## rsync_log $data";
        }
@@ -190,7 +280,7 @@ sub rsync_transfer {
                my $type = $1;
 
                if ( $type eq 'f' ) {
-                       $self->modify_file( $data );
+                       $self->modify_file( $data ) && # selective dedup
                        $self->dedup( $data, $path );
                } elsif ( $type eq 'd' ) {
                        $self->make_dir( $data );
@@ -201,6 +291,8 @@ sub rsync_transfer {
                }
        } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
                $self->remove_file($data);
+       } else {
+               warn "IGNORED ",dump($data);
        }
        return $data;
 }
@@ -223,10 +315,10 @@ sub md5pool {
                # FIXME fix perms?
        } else {
                link $path, "$pool/$md5";
+               warn "dedup +++ $md5 $path";
        }
 
-       my $md5sum = $self->md5sum($data);
-       $md5sum->{ $data->{file} } = $md5;
+       $self->md5sum($data)->put( $data->{file} => $md5 );
 }
 
 my $empty_md5 = " " x 32;