remove file send via file API
[cloudstore.git] / lib / CloudStore / Store.pm
1 package CloudStore::Store;
2 use warnings;
3 use strict;
4
5 use lib 'lib';
6 use base 'CloudStore::MD5sum';
7 use CloudStore::API;
8
9 use autodie;
10 use JSON::XS;
11 use File::Path qw(make_path);
12 use File::Slurp qw();
13 use Digest::MD5 qw(md5_base64);
14 use Data::Dump qw(dump);
15 use Carp qw(confess);
16
17 use WarnColor;
18
19 sub new {
20         my ($class,$group) = @_;
21
22         my $self = {
23                 api => CloudStore::API->new( $group ),
24         };
25         bless $self, $class;
26
27         $self->{md5pool} = $self->{api}->{md5}->{dir};
28
29         warn "# new ",dump $self if $ENV{DEBUG};
30
31         return $self;
32 }
33
34 sub mkbasedir {
35         my $dir = shift;
36         $dir =~ s{/[^/]+$}{}; # strip filename
37         mkdir $dir unless -e $dir;
38 }
39
40 sub modify_file {
41         my ( $self,$data ) = @_;
42
43 =for removed
44
45         if ( $data->{file} =~ m{^(.*/)?.sync/send/([^/]+)$} ) {
46                 my $from_dir = $1;
47                 warn "SEND $2 from $from_dir\n";
48                 my $sent_files;
49                 open(my $send, '<', $self->blob_path($data) );
50                 while(<$send>) {
51                         s/[\n\r]+$//;
52
53                         my ( $to, $file ) = split(/\s+/,$_,2);
54                         my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
55                                 getpwnam $to;
56
57                         my $from = $data;
58                         $from->{file} = $from_dir . $file;
59                         my $from_path = $self->blob_path($from);
60
61                         if ( ! -r $from_path ) {
62                                 warn "ERROR: $from_path: $!";
63                                 next;
64                         }
65
66                         my $to_path = "$dir/received/$file";
67                         mkbasedir $to_path;
68
69                         warn "SEND $from_path -> $to_path\n";
70                         unlink $to_path if -e $to_path; # FIXME why we need this?
71                         $sent_files->{$to} += link $from_path, $to_path;
72                         # FIXME cross-shard
73                         my $md5 = $self->md5sum($data)->get( $from_dir . $file ) || warn "no md5 for $from_dir$file";
74                         $self->md5sum({login => $to})->put( "/received/$file" => $md5 );
75                 }
76
77                 warn "SENT ",dump $sent_files;
78
79                 return 0; # skip dedup
80         } elsif ( $data->{file} =~ m{^(.*/)?.sync/pending/([^/]+)$} ) {
81                 my $from_dir = $1;
82                 warn "PENDIG $2 from $from_dir";
83                 open(my $pend, '<', $self->blob_path($data) );
84                 while(<$pend>) {
85                         s/[\n\r]+$//;
86
87                         if ( m/^DELETED\#(.+)$/ ) {
88                                 my $path = $self->blob_path($data => $from_dir . $1 );
89                                 if ( -e $path ) {
90                                         warn "UNLINK $path";
91                                         -d $path ? rmdir $path : unlink $path || warn "ERROR: unlink $path $!";
92                                         next;
93                                 } else {
94                                         warn "MISSING $path to unlink";
95                                         next;
96                                 }
97                         } elsif ( ! /^(MOVED|RENAMED)\#/ ) {
98                                 warn "skip $_\n";
99                                 next;
100                         }
101
102                         my ( undef, $from, $to ) = split(/\#/,$_,3);
103
104                         my ( $from_path, $to_path ) = map {
105                                 my $tmp = $data;
106                                 $tmp->{file} = $from_dir . $_;
107                                 $self->blob_path($tmp);
108                         } ( $from, $to );
109
110                         if ( ! -e $from_path ) {
111                                 warn "SKIPPED $from_path: $!";
112                                 next;
113                         }
114
115                         warn "MV $from_path -> $to_path";
116                         mkbasedir $to_path;
117                         rename $from_path, $to_path;
118         
119                         my $md5 = $self->md5sum($data)->get( $from_dir . $from );
120                         if ( ! $md5 ) {
121                                 warn "ERROR: no md5sum $from_dir $from " unless $md5;
122                                 next;
123                         }
124
125                         $self->md5sum($data)->out( $from_dir . $from );
126                         $self->md5sum($data)->put( $from_dir . $to => $md5 );
127
128                         warn "$md5 moved to $from_dir $to";
129                 }
130
131                 return 0; # skip dedup
132         }
133
134 =cut
135
136         if ( $data->{file} =~ m{^(.*/)?.sync/} ) {
137                 # ignore .sync/ files from client
138                 return 0;
139         }
140
141         #return $file->{size} > 4096 ? 1 : 0; # FIXME
142         return 1; # dedup
143 }
144
145 # never called by rsync directly!
146 sub new_file {
147         my ( $self,$data ) = @_;
148 #       $self->file_set($data);
149 }
150
151 # client doesn't issue --delete
152 sub removed_file {
153         my ( $self, $data ) = @_;
154
155         my $md5 = $self->md5sum($data)->get( $data->{file} );
156         return unless $md5; # directories don't have md5sums
157         my $path = $self->{md5pool} . '/' . $md5;
158         my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
159                 $atime,$mtime,$ctime,$blksize,$blocks)
160                         = stat($path);
161
162         my $user = $self->{api}->user_info($data->{login});
163
164         if ( $nlink == 1 && $uid == $user->{uid} ) {
165                 $self->append( $user, 'removed', -$size, $uid, $data->{file} );
166                 my $id = getpwnam 'md5';
167                 chown $id,$gid, $path;
168                 warn "# chown $id $gid $path";
169         }
170
171         $self->md5sum($data)->out( $data->{file} );
172 }
173
174 sub make_dir {
175         my ( $self, $data ) = @_;
176
177 }
178
179 sub new_link {
180         my ( $self, $data ) = @_;
181
182         warn "# new_link ",dump $data;
183
184 =for removed
185
186         if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
187                 my ( $dir, $to, $name ) = ( $1, $2, $3 );
188                 my $path = $self->blob_path($data);
189                 my $link_to = readlink $path;
190                 warn "$link_to";
191                 if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
192
193                         my $s = $path;
194                         $s =~ s{/[^/]+$}{}; # strip filename
195                         while ( $link_to =~ s{/../}{/} ) {
196                                 $s =~ s{/[^/]+$}{} || die "can't strip $s";
197                                 warn "## simplify $s $link_to\n";
198                         }
199                         $s .= $link_to;
200
201                         my $d = $self->blob_path({
202                                 pid => $data->{pid},
203                                 file => $name
204                         });
205
206                         # $name can contain directories so we must create them
207                         mkbasedir $d;
208
209                         if ( ! -e $s ) {
210                                 warn "ERROR: can't find source $s";
211                         } else {
212
213                                 warn "link $s -> $d\n";
214                                 link $s, $d;
215
216                                 my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
217
218 #                               my $origin = $self->file_get({
219 #                                       login => $l,
220 #                                       file  => $f,
221 #                               });
222 #                               $self->new_file($origin);
223                                 warn "INFO: sent file ",dump($l,$f);
224                                 my $md5 = $self->md5sum($data)->get($s);
225                                 $self->md5sum({ login => $to })->put($d => $md5 );
226                                 # FIXME broken!
227                         }
228
229
230                 } else {
231                         warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
232                 }
233         }
234
235 =cut
236
237 }
238
239 sub init_pid_login {
240         my ( $self, $pid, $login ) = @_;
241
242         $login =~ s/\@.+//;
243         $self->{pid}->{$pid} = $self->{api}->user_info($login);
244
245         warn "created $pid";
246 }
247
248 sub cleanup_pid {
249         my ( $self, $pid ) = @_;
250
251         $self->md5sum_close;
252
253         delete $self->{pid}->{$pid};
254         warn "removed $pid";
255 }
256
257 sub rsync_log {
258         my ( $self, $data ) = @_;
259         if ( $data =~ m/\[(\d+)\] rsync \w+ (\S+) \w+ (\S+)/ ) {
260                 my ( $pid, $module, $login ) = ( $1, $2, $3 );
261                 $self->init_pid_login( $pid, $login );
262         } elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
263                 my $pid = $1;
264                 $self->cleanup_pid( $pid );
265         } else {
266                 warn "## rsync_log $data";
267         }
268 }
269
270 sub blob_path {
271         my ( $self, $data, $path ) = @_;
272         my $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for $data->{pid} in ",dump( $self->{pid} );
273         $blob .= '/' . ( defined $path ? $path : $data->{file} );
274         return $blob;
275 }
276
277
278 sub rsync_transfer {
279         my ( $self,$data ) = @_;
280
281         my $path = $self->blob_path($data);
282
283         if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
284                 my $type = $1;
285
286                 if ( $type eq 'f' ) {
287                         $self->modify_file( $data ) && # selective dedup
288                         $self->dedup( $data, $path );
289                 } elsif ( $type eq 'd' ) {
290                         $self->make_dir( $data );
291                 } elsif ( $type eq 'L' ) {
292                         $self->new_link( $data );
293                 } else {
294                         die "unknown type $type ", dump $data;
295                 }
296         } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
297                 $self->removed_file($data);
298         } else {
299                 warn "IGNORED ",dump($data) if $ENV{DEBUG};
300         }
301         return $data;
302 }
303
304 sub append {
305         my $self = shift @_;
306         $self->{api}->append( @_ );
307 }
308
309 sub md5pool {
310         my ( $self, $data ) = @_;
311
312         my $pool = $self->{md5pool} || die "no md5pool in ",dump $self;
313         mkdir $pool unless -e $pool;
314
315         my $md5 = $data->{md5} || die "no md5 in ",dump $data;
316         my $path = $self->blob_path($data);
317
318         my $pool_md5 = "$pool/$md5";
319
320         if ( -e $pool_md5 ) {
321                 warn "dedup hit $md5 $path\n";
322
323                 my ($pool_uid,$pool_size) = (stat($pool_md5))[4,7];
324                 my $user = $self->{api}->user_info( $data->{login} );
325
326                 if ( $pool_uid != $self->{api}->{md5}->{uid} ) {
327                         chown $self->{api}->{md5}->{uid}, $self->{api}->{md5}->{gid}, $pool_md5;
328                         chmod oct("0444"), $pool_md5;
329                         my $steal_user = $self->{api}->user_info( $pool_uid );
330                         $self->append( $steal_user, 'dedup-steal', $pool_size, $pool_uid, $data->{file} );
331                 }
332                 $self->append( $user, 'dedup', $pool_size, $pool_uid, $data->{file} );
333
334                 my $dedup = $path . '.dedup';
335                 rename $path, $dedup;
336                 link "$pool/$md5", $path;
337                 unlink $dedup;
338         } else {
339                 link $path, "$pool/$md5";
340                 warn "dedup +++ $md5 $path";
341         }
342
343         $self->md5sum($data)->put( $data->{file} => $md5 );
344 }
345
346 my $empty_md5 = " " x 32;
347
348 sub dedup {
349         my ( $self, $data, $path ) = @_;
350
351         if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
352                 my $dir = $1;
353                 my $imported = 0;
354                 warn "IMPORT ", $data->{file}, "\n";
355                 open(my $md5sum, '<', $path);
356                 while(<$md5sum>) {
357                         chomp;
358                         my ( $md5, $file ) = split(/\s+/,$_,2);
359                         if ( ! -e "$self->{md5path}/$md5" ) {
360                                 warn "MISSING $md5 $file\n";
361                                 next;
362                         }
363                         my $new = {
364                                 pid => $data->{pid},
365                                 file => "$dir$file",
366                                 md5 => $md5,
367                         };
368                         my $new_path = $self->blob_path($new);
369                         if ( ! -e $new_path ) {
370                                 $self->{api}->send_file( 'md5' => $md5, $data->{login}, "$dir$file" );
371                                 $self->md5pool( $new );
372                         } else {
373                                 $self->md5pool( $new );
374                         }
375                 }
376                 print "INFO imported $imported files from ",dump($data);
377         }
378
379         if ( $data->{md5} ne $empty_md5 ) {
380                 $self->md5sum($data)->put( $data->{file} => $data->{md5} );
381                 $self->md5pool( $data );
382         } else {
383                 warn "empty md5", dump $data;
384         }
385 }
386
387 1;