don't die on md5sum without filename
[cloudstore.git] / lib / CloudStore / Store.pm
index c879681..1956a73 100644 (file)
@@ -2,6 +2,10 @@ package CloudStore::Store;
 use warnings;
 use strict;
 
+use lib 'lib';
+use base qw(CloudStore::MD5sum);
+use CloudStore::API;
+
 use autodie;
 use JSON::XS;
 use File::Path qw(make_path);
@@ -9,52 +13,169 @@ use File::Slurp qw();
 use Digest::MD5 qw(md5_base64);
 use Data::Dump qw(dump);
 use Carp qw(confess);
-use BerkeleyDB;
 
 use WarnColor;
 
 sub new {
-       my ($class) = @_;
+       my ($class,$group) = @_;
 
-       my $self = {};
+       my $self = {
+               api => CloudStore::API->new( $group ),
+       };
        bless $self, $class;
 
-       my %md5;
-       $self->{db} = tie %md5, 'BerkeleyDB::Hash', -Filename => '/tmp/md5.db', -Flags => DB_CREATE;
-       $self->{md5} = \%md5;
+       $self->{md5pool} = $self->{api}->{md5}->{dir};
 
        warn "# new ",dump $self if $ENV{DEBUG};
 
        return $self;
 }
 
-sub user_set {
-       my ( $self,$data ) = @_;
-}
+sub api { $_[0]->{api} }
 
-sub user_get {
-       my ( $self,$data ) = @_;
+sub mkbasedir {
+       my $dir = shift;
+       $dir =~ s{/[^/]+$}{}; # strip filename
+       mkdir $dir unless -e $dir;
 }
 
 sub modify_file {
        my ( $self,$data ) = @_;
 
-#      if ( my $old = $self->file_get( $data ) ) {
-#              $self->usage_decr( $data );
-#      }
+=for removed
+
+       if ( $data->{file} =~ m{^(.*/)?.sync/send/([^/]+)$} ) {
+               my $from_dir = $1;
+               warn "SEND $2 from $from_dir\n";
+               my $sent_files;
+               open(my $send, '<', $self->blob_path($data) );
+               while(<$send>) {
+                       s/[\n\r]+$//;
+
+                       my ( $to, $file ) = split(/\s+/,$_,2);
+                       my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
+                               getpwnam $to;
+
+                       my $from = $data;
+                       $from->{file} = $from_dir . $file;
+                       my $from_path = $self->blob_path($from);
+
+                       if ( ! -r $from_path ) {
+                               warn "ERROR: $from_path: $!";
+                               next;
+                       }
 
-       $self->new_file($data);
+                       my $to_path = "$dir/received/$file";
+                       mkbasedir $to_path;
+
+                       warn "SEND $from_path -> $to_path\n";
+                       unlink $to_path if -e $to_path; # FIXME why we need this?
+                       $sent_files->{$to} += link $from_path, $to_path;
+                       # FIXME cross-shard
+                       my $md5 = $self->md5sum($data)->get( $from_dir . $file ) || warn "no md5 for $from_dir$file";
+                       $self->md5sum({login => $to})->put( "/received/$file" => $md5 );
+               }
+
+               warn "SENT ",dump $sent_files;
+
+               return 0; # skip dedup
+       } elsif ( $data->{file} =~ m{^(.*/)?.sync/pending/([^/]+)$} ) {
+               my $from_dir = $1;
+               warn "PENDIG $2 from $from_dir";
+               open(my $pend, '<', $self->blob_path($data) );
+               while(<$pend>) {
+                       s/[\n\r]+$//;
+
+                       if ( m/^DELETED\#(.+)$/ ) {
+                               my $path = $self->blob_path($data => $from_dir . $1 );
+                               if ( -e $path ) {
+                                       warn "UNLINK $path";
+                                       -d $path ? rmdir $path : unlink $path || warn "ERROR: unlink $path $!";
+                                       next;
+                               } else {
+                                       warn "MISSING $path to unlink";
+                                       next;
+                               }
+                       } elsif ( ! /^(MOVED|RENAMED)\#/ ) {
+                               warn "skip $_\n";
+                               next;
+                       }
+
+                       my ( undef, $from, $to ) = split(/\#/,$_,3);
+
+                       my ( $from_path, $to_path ) = map {
+                               my $tmp = $data;
+                               $tmp->{file} = $from_dir . $_;
+                               $self->blob_path($tmp);
+                       } ( $from, $to );
+
+                       if ( ! -e $from_path ) {
+                               warn "SKIPPED $from_path: $!";
+                               next;
+                       }
+
+                       warn "MV $from_path -> $to_path";
+                       mkbasedir $to_path;
+                       rename $from_path, $to_path;
+       
+                       my $md5 = $self->md5sum($data)->get( $from_dir . $from );
+                       if ( ! $md5 ) {
+                               warn "ERROR: no md5sum $from_dir $from " unless $md5;
+                               next;
+                       }
+
+                       $self->md5sum($data)->out( $from_dir . $from );
+                       $self->md5sum($data)->put( $from_dir . $to => $md5 );
+                       $self->md5sum_close($data);
+
+                       warn "$md5 moved to $from_dir $to";
+               }
+
+               return 0; # skip dedup
+       }
+
+=cut
+
+       if ( $data->{file} =~ m{^(.*/)?.sync/} ) {
+               # ignore .sync/ files from client
+               return 0;
+       }
+
+       #return $file->{size} > 4096 ? 1 : 0; # FIXME
+       return 1; # dedup
 }
 
+# never called by rsync directly!
 sub new_file {
        my ( $self,$data ) = @_;
 #      $self->file_set($data);
-#      $self->usage_incr($data);
 }
 
-sub remove_file {
+# client doesn't issue --delete
+sub removed_file {
        my ( $self, $data ) = @_;
-#      $self->usage_decr( $data );
+
+=for removed
+       my $path = $self->blob_path( $data );
+       my $md5 = $self->md5_get( $path );
+       return unless $md5; # directories don't have md5sums
+       my $path = $self->{md5pool} . '/' . $md5;
+       my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
+               $atime,$mtime,$ctime,$blksize,$blocks)
+                       = stat($path);
+
+       my $user = $self->{api}->user_info($data->{login});
+
+       if ( $nlink == 1 && $uid == $user->{uid} ) {
+               $self->append( $user, 'removed', -$size, $uid, $data->{file} );
+               my $id = getpwnam 'md5';
+               chown $id,$gid, $path;
+               warn "# chown $id $gid $path";
+       }
+
+       $self->api->append_meta('md5sum', $user, 'delete', $data->{file} );
+=cut
+
 }
 
 sub make_dir {
@@ -67,9 +188,11 @@ sub new_link {
 
        warn "# new_link ",dump $data;
 
+=for removed
+
        if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
                my ( $dir, $to, $name ) = ( $1, $2, $3 );
-               my $path = "users/$data->{login}/blob/" . $data->{file};
+               my $path = $self->blob_path($data);
                my $link_to = readlink $path;
                warn "$link_to";
                if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
@@ -82,17 +205,13 @@ sub new_link {
                        }
                        $s .= $link_to;
 
-                       my $d = "users/$to/blob";
-                       if ( ! -e $d ) {
-                               warn "ERROR: no to user $to in $d";
-                               return;
-                       }
-                       $d .= "/$name";
+                       my $d = $self->blob_path({
+                               pid => $data->{pid},
+                               file => $name
+                       });
 
                        # $name can contain directories so we must create them
-                       my $to_dir = $d;
-                       $to_dir =~ s{/[^/]+$}{};
-                       make_path $to_dir if ! -e $to_dir;
+                       mkbasedir $d;
 
                        if ( ! -e $s ) {
                                warn "ERROR: can't find source $s";
@@ -109,8 +228,9 @@ sub new_link {
 #                              });
 #                              $self->new_file($origin);
                                warn "INFO: sent file ",dump($l,$f);
-                               my $md5 = $self->{md5}->{$s} || die "no md5 for $s";
-                               $self->{md5}->{$d} = $md5;
+                               my $md5 = $self->md5sum($data)->get($s);
+                               $self->md5sum({ login => $to })->put($d => $md5 );
+                               # FIXME broken!
                        }
 
 
@@ -118,19 +238,63 @@ sub new_link {
                        warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
                }
        }
+
+=cut
+
+}
+
+sub init_pid_login {
+       my ( $self, $pid, $login ) = @_;
+
+       $login =~ s/\@.+//;
+       $self->{pid}->{$pid} = $self->{api}->user_info($login);
+
+       warn "created $pid";
+}
+
+sub cleanup_pid {
+       my ( $self, $pid ) = @_;
+
+       delete $self->{pid}->{$pid};
+       warn "removed $pid";
+}
+
+sub rsync_log {
+       my ( $self, $data ) = @_;
+       if ( $data =~ m/\[(\d+)\] rsync \w+ (\S+) \w+ (\S+)/ ) {
+               my ( $pid, $module, $login ) = ( $1, $2, $3 );
+               $self->init_pid_login( $pid, $login );
+       } elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
+               my $pid = $1;
+               $self->cleanup_pid( $pid );
+       } else {
+               warn "## rsync_log $data" if $ENV{DEBUG};
+       }
+}
+
+sub blob_path {
+       my ( $self, $data, $path ) = @_;
+       my $blob = $self->{pid}->{ $data->{pid} }->{dir};
+       if ( ! $blob ) {
+               warn "ERROR: $data->{pid} not found, possible restart?";
+               $self->init_pid_login( $data->{pid}, $data->{login} );
+               $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for ", dump( $self->{pid}->{ $data->{pid} } );
+       }
+       $blob .= '/' . ( defined $path ? $path : $data->{file} );
+       return $blob;
 }
 
-sub transfer {
+
+sub rsync_transfer {
        my ( $self,$data ) = @_;
 
-       my $blob = "users/$data->{login}/blob";
-        my $path = "$blob/$data->{file}";
+        my $path = $self->blob_path($data);
 
        if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
                my $type = $1;
 
                if ( $type eq 'f' ) {
-                       $self->modify_file( $data );
+                       $self->modify_file( $data ) && # selective dedup
                        $self->dedup( $data, $path );
                } elsif ( $type eq 'd' ) {
                        $self->make_dir( $data );
@@ -139,31 +303,65 @@ sub transfer {
                } else {
                        die "unknown type $type ", dump $data;
                }
+               $self->api->refresh_file_list( $data->{login} );
        } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
-               $self->remove_file($data);
+               $self->removed_file($data);
+               $self->api->refresh_file_list( $data->{login} );
+       } else {
+               warn "IGNORED ",dump($data) if $ENV{DEBUG};
        }
        return $data;
 }
 
+sub append {
+       my $self = shift @_;
+       $self->{api}->append( @_ );
+}
+
 sub md5pool {
-       my ( $self, $path, $md5 ) = @_;
+       my ( $self, $data ) = @_;
 
-       my $pool = 'md5'; # FIXME sharding?
+       my $pool = $self->{md5pool} || die "no md5pool in ",dump $self;
        mkdir $pool unless -e $pool;
 
-       if ( -e "$pool/$md5" ) {
+       my $md5 = $data->{md5} || die "no md5 in ",dump $data;
+       my $path = $self->blob_path($data);
+
+       if ( ! -e $path ) {
+               warn "ERROR missing path $path";
+               return;
+       }
+
+       my $pool_md5 = "$pool/$md5";
+
+       if ( -e $pool_md5 ) {
                warn "dedup hit $md5 $path\n";
+
+               my ($pool_uid,$pool_size) = (stat($pool_md5))[4,7];
+               my $user = $self->{api}->user_info( $data->{login} );
+
+               if ( $pool_uid != $user->{uid} ) {
+                       if ( $pool_uid != $self->{api}->{md5}->{uid} ) {
+                               chown $self->{api}->{md5}->{uid}, $self->{api}->{md5}->{gid}, $pool_md5;
+                               chmod oct("0444"), $pool_md5;
+                               my $steal_user = $self->{api}->user_info( $pool_uid );
+                               $self->append( $steal_user, 'dedup-steal', $pool_size, $pool_uid, $data->{file} );
+                       }
+                       $self->append( $user, 'dedup', $pool_size, $pool_uid, $data->{file} );
+               }
+
                my $dedup = $path . '.dedup';
                rename $path, $dedup;
                link "$pool/$md5", $path;
                unlink $dedup;
-               # FIXME fix perms?
        } else {
                link $path, "$pool/$md5";
+               warn "dedup +++ $md5 $path";
        }
 
-       $self->{md5}->{$path} = $md5;
-       warn "++ $md5 $path\n";
+       $self->md5_set( $path => $md5 );
+
+       $self->api->append_meta('md5sum', $data->{login}, $md5, $data->{file} );
 }
 
 my $empty_md5 = " " x 32;
@@ -172,41 +370,42 @@ sub dedup {
        my ( $self, $data, $path ) = @_;
 
        if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
-               my $dir = $1;
+               my $dir = $1 || '';
                my $imported = 0;
                warn "IMPORT ", $data->{file}, "\n";
                open(my $md5sum, '<', $path);
                while(<$md5sum>) {
                        chomp;
                        my ( $md5, $file ) = split(/\s+/,$_,2);
-                       if ( ! -e "md5/$md5" ) {
+                       if ( ! $file ) {
+                               warn "IGNORE $md5 without file\n";
+                               next;
+                       }
+                       if ( ! -e "$self->{md5pool}/$md5" ) {
                                warn "MISSING $md5 $file\n";
                                next;
                        }
-                       my $new = "users/$data->{login}/blob/$dir$file";
-                       if ( ! -e $new ) {
-                               # create path from md5sum file
-                               my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
-                               make_path $only_dir unless -d $only_dir;
-                               $imported += link "md5/$md5", $new;
-                               my $fake = {
-                                       login => $data->{login},
-                                       host => $data->{host},
-                                       file => $dir . $file,
-                                       md5 => $md5,
-                                       size => -s $new,
-                               };
-                               $self->new_file($fake);
-                               warn "import from $path ",dump($fake);
+                       my $new = {
+                               login => $data->{login},
+                               pid => $data->{pid},
+                               file => "$dir$file",
+                               md5 => $md5,
+                       };
+                       my $new_path = $self->blob_path($new);
+                       if ( ! -e $new_path ) {
+                               $self->{api}->send_file( 'md5' => $md5, $data->{login}, "$dir$file" );
+                               $self->md5pool( $new );
                        } else {
-                               $self->md5pool( $new => $md5 );
+                               $self->md5pool( $new );
                        }
                }
-               print "INFO imported $imported files from ",dump($data);
+               warn "INFO imported $imported files from ",dump($data);
+
+               return; # don't put md5sum files into pool
        }
 
        if ( $data->{md5} ne $empty_md5 ) {
-               $self->md5pool( $path => $data->{md5} );
+               $self->md5pool( $data );
        } else {
                warn "empty md5", dump $data;
        }