X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=lib%2FCloudStore%2FStore.pm;h=1956a732d8200b7dc7873d3c274792f8c65cab60;hb=e566496afecf10279429815772a6ca45eddfdb9d;hp=8ab613b1cc9dde58fb24f007597101a9fae95542;hpb=9a4938e52036d70276036e21d3ed8ddb21660772;p=cloudstore.git diff --git a/lib/CloudStore/Store.pm b/lib/CloudStore/Store.pm index 8ab613b..1956a73 100644 --- a/lib/CloudStore/Store.pm +++ b/lib/CloudStore/Store.pm @@ -2,6 +2,10 @@ package CloudStore::Store; use warnings; use strict; +use lib 'lib'; +use base qw(CloudStore::MD5sum); +use CloudStore::API; + use autodie; use JSON::XS; use File::Path qw(make_path); @@ -9,31 +13,25 @@ use File::Slurp qw(); use Digest::MD5 qw(md5_base64); use Data::Dump qw(dump); use Carp qw(confess); -use TokyoCabinet; use WarnColor; sub new { - my $class = shift; + my ($class,$group) = @_; - my $self = {@_}; + my $self = { + api => CloudStore::API->new( $group ), + }; bless $self, $class; - die "no dir" unless $self->{dir}; - $self->{md5pool} = $self->{dir} . '/md5'; + $self->{md5pool} = $self->{api}->{md5}->{dir}; warn "# new ",dump $self if $ENV{DEBUG}; return $self; } -sub user_set { - my ( $self,$data ) = @_; -} - -sub user_get { - my ( $self,$data ) = @_; -} +sub api { $_[0]->{api} } sub mkbasedir { my $dir = shift; @@ -44,6 +42,8 @@ sub mkbasedir { sub modify_file { my ( $self,$data ) = @_; +=for removed + if ( $data->{file} =~ m{^(.*/)?.sync/send/([^/]+)$} ) { my $from_dir = $1; warn "SEND $2 from $from_dir\n"; @@ -85,8 +85,18 @@ sub modify_file { open(my $pend, '<', $self->blob_path($data) ); while(<$pend>) { s/[\n\r]+$//; -warn $_; - if ( ! /^(MOVED|RENAMED)\#/ ) { + + if ( m/^DELETED\#(.+)$/ ) { + my $path = $self->blob_path($data => $from_dir . $1 ); + if ( -e $path ) { + warn "UNLINK $path"; + -d $path ? rmdir $path : unlink $path || warn "ERROR: unlink $path $!"; + next; + } else { + warn "MISSING $path to unlink"; + next; + } + } elsif ( ! /^(MOVED|RENAMED)\#/ ) { warn "skip $_\n"; next; } @@ -109,10 +119,14 @@ warn $_; rename $from_path, $to_path; my $md5 = $self->md5sum($data)->get( $from_dir . $from ); - die "no md5sum $from_dir $from " unless $md5; + if ( ! $md5 ) { + warn "ERROR: no md5sum $from_dir $from " unless $md5; + next; + } $self->md5sum($data)->out( $from_dir . $from ); $self->md5sum($data)->put( $from_dir . $to => $md5 ); + $self->md5sum_close($data); warn "$md5 moved to $from_dir $to"; } @@ -120,6 +134,13 @@ warn $_; return 0; # skip dedup } +=cut + + if ( $data->{file} =~ m{^(.*/)?.sync/} ) { + # ignore .sync/ files from client + return 0; + } + #return $file->{size} > 4096 ? 1 : 0; # FIXME return 1; # dedup } @@ -130,22 +151,31 @@ sub new_file { # $self->file_set($data); } -sub remove_file { +# client doesn't issue --delete +sub removed_file { my ( $self, $data ) = @_; - my $md5 = $self->md5sum($data)->get( $data->{file} ); +=for removed + my $path = $self->blob_path( $data ); + my $md5 = $self->md5_get( $path ); return unless $md5; # directories don't have md5sums my $path = $self->{md5pool} . '/' . $md5; my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size, $atime,$mtime,$ctime,$blksize,$blocks) = stat($path); - if ( $nlink == 1 ) { + + my $user = $self->{api}->user_info($data->{login}); + + if ( $nlink == 1 && $uid == $user->{uid} ) { + $self->append( $user, 'removed', -$size, $uid, $data->{file} ); my $id = getpwnam 'md5'; chown $id,$gid, $path; warn "# chown $id $gid $path"; } - $self->md5sum($data)->out( $data->{file} ); + $self->api->append_meta('md5sum', $user, 'delete', $data->{file} ); +=cut + } sub make_dir { @@ -158,6 +188,8 @@ sub new_link { warn "# new_link ",dump $data; +=for removed + if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) { my ( $dir, $to, $name ) = ( $1, $2, $3 ); my $path = $self->blob_path($data); @@ -179,9 +211,7 @@ sub new_link { }); # $name can contain directories so we must create them - my $to_dir = $d; - $to_dir =~ s{/[^/]+$}{}; - make_path $to_dir if ! -e $to_dir; + mkbasedir $d; if ( ! -e $s ) { warn "ERROR: can't find source $s"; @@ -208,72 +238,49 @@ sub new_link { warn "ERROR: can't SEND To:$to Name:$name Link:$link_to"; } } -} -sub md5sum { - my ( $self, $data ) = @_; - - my $login = $data->{login} || confess "missing login in ",dump $data; +=cut - return $self->{md5sum}->{$login} if exists $self->{md5sum}->{$login}; +} - my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) = - getpwnam $login; +sub init_pid_login { + my ( $self, $pid, $login ) = @_; - my $md5_path = "$dir/.md5"; + $login =~ s/\@.+//; + $self->{pid}->{$pid} = $self->{api}->user_info($login); - my $db = TokyoCabinet::HDB->new(); - $db->open($md5_path, $db->OWRITER | $db->OCREAT) - or die "can't open $md5_path: ",$db->errmsg( $db->ecode ); + warn "created $pid"; +} - warn "open $md5_path"; +sub cleanup_pid { + my ( $self, $pid ) = @_; - $self->{md5sum}->{$login} = $db; - return $db; + delete $self->{pid}->{$pid}; + warn "removed $pid"; } sub rsync_log { my ( $self, $data ) = @_; if ( $data =~ m/\[(\d+)\] rsync \w+ (\S+) \w+ (\S+)/ ) { my ( $pid, $module, $login ) = ( $1, $2, $3 ); - - $login =~ s/\@.+//; - my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) = - getpwnam $login; - - $self->{pid}->{$pid} = { - login => $login, - uid => $uid, - gid => $gid, - email => $email, - dir => $dir, - shell => $shell, - }; - - warn "created $pid"; - + $self->init_pid_login( $pid, $login ); } elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) { my $pid = $1; - - foreach my $login ( keys %{ $self->{md5sum} } ) { - $self->{md5sum}->{$login}->close; - warn "close md5sum $login"; - } - delete $self->{md5sum}; - - delete $self->{pid}->{$pid}; - warn "removed $pid"; -warn dump $self; - + $self->cleanup_pid( $pid ); } else { -# warn "## rsync_log $data"; + warn "## rsync_log $data" if $ENV{DEBUG}; } } sub blob_path { - my ( $self, $data ) = @_; - my $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for $data->{pid} in ",dump( $self->{pid} ); - $blob .= '/' . $data->{file}; + my ( $self, $data, $path ) = @_; + my $blob = $self->{pid}->{ $data->{pid} }->{dir}; + if ( ! $blob ) { + warn "ERROR: $data->{pid} not found, possible restart?"; + $self->init_pid_login( $data->{pid}, $data->{login} ); + $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for ", dump( $self->{pid}->{ $data->{pid} } ); + } + $blob .= '/' . ( defined $path ? $path : $data->{file} ); return $blob; } @@ -296,14 +303,21 @@ sub rsync_transfer { } else { die "unknown type $type ", dump $data; } + $self->api->refresh_file_list( $data->{login} ); } elsif ( $data->{itemize} =~ m/\*deleting/ ) { - $self->remove_file($data); + $self->removed_file($data); + $self->api->refresh_file_list( $data->{login} ); } else { warn "IGNORED ",dump($data) if $ENV{DEBUG}; } return $data; } +sub append { + my $self = shift @_; + $self->{api}->append( @_ ); +} + sub md5pool { my ( $self, $data ) = @_; @@ -313,19 +327,41 @@ sub md5pool { my $md5 = $data->{md5} || die "no md5 in ",dump $data; my $path = $self->blob_path($data); - if ( -e "$pool/$md5" ) { + if ( ! -e $path ) { + warn "ERROR missing path $path"; + return; + } + + my $pool_md5 = "$pool/$md5"; + + if ( -e $pool_md5 ) { warn "dedup hit $md5 $path\n"; + + my ($pool_uid,$pool_size) = (stat($pool_md5))[4,7]; + my $user = $self->{api}->user_info( $data->{login} ); + + if ( $pool_uid != $user->{uid} ) { + if ( $pool_uid != $self->{api}->{md5}->{uid} ) { + chown $self->{api}->{md5}->{uid}, $self->{api}->{md5}->{gid}, $pool_md5; + chmod oct("0444"), $pool_md5; + my $steal_user = $self->{api}->user_info( $pool_uid ); + $self->append( $steal_user, 'dedup-steal', $pool_size, $pool_uid, $data->{file} ); + } + $self->append( $user, 'dedup', $pool_size, $pool_uid, $data->{file} ); + } + my $dedup = $path . '.dedup'; rename $path, $dedup; link "$pool/$md5", $path; unlink $dedup; - # FIXME fix perms? } else { link $path, "$pool/$md5"; warn "dedup +++ $md5 $path"; } - $self->md5sum($data)->put( $data->{file} => $md5 ); + $self->md5_set( $path => $md5 ); + + $self->api->append_meta('md5sum', $data->{login}, $md5, $data->{file} ); } my $empty_md5 = " " x 32; @@ -334,36 +370,38 @@ sub dedup { my ( $self, $data, $path ) = @_; if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) { - my $dir = $1; + my $dir = $1 || ''; my $imported = 0; warn "IMPORT ", $data->{file}, "\n"; open(my $md5sum, '<', $path); while(<$md5sum>) { chomp; my ( $md5, $file ) = split(/\s+/,$_,2); - if ( ! -e "$self->{md5path}/$md5" ) { + if ( ! $file ) { + warn "IGNORE $md5 without file\n"; + next; + } + if ( ! -e "$self->{md5pool}/$md5" ) { warn "MISSING $md5 $file\n"; next; } my $new = { + login => $data->{login}, pid => $data->{pid}, file => "$dir$file", md5 => $md5, }; my $new_path = $self->blob_path($new); if ( ! -e $new_path ) { - # create path from md5sum file - my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$}; - make_path $only_dir unless -d $only_dir; - $imported += link "$self->{md5path}/$md5", $new_path; - $self->new_file($new); - warn "import from $path ",dump($new); + $self->{api}->send_file( 'md5' => $md5, $data->{login}, "$dir$file" ); $self->md5pool( $new ); } else { $self->md5pool( $new ); } } - print "INFO imported $imported files from ",dump($data); + warn "INFO imported $imported files from ",dump($data); + + return; # don't put md5sum files into pool } if ( $data->{md5} ne $empty_md5 ) {