use correct pid when running init_pid_login after restart
[cloudstore.git] / lib / CloudStore / Store.pm
1 package CloudStore::Store;
2 use warnings;
3 use strict;
4
5 use lib 'lib';
6 use base 'CloudStore::MD5sum';
7 use CloudStore::API;
8
9 use autodie;
10 use JSON::XS;
11 use File::Path qw(make_path);
12 use File::Slurp qw();
13 use Digest::MD5 qw(md5_base64);
14 use Data::Dump qw(dump);
15 use Carp qw(confess);
16
17 use WarnColor;
18
19 sub new {
20         my ($class,$group) = @_;
21
22         my $self = {
23                 api => CloudStore::API->new( $group ),
24         };
25         bless $self, $class;
26
27         $self->{md5pool} = $self->{api}->{md5}->{dir};
28
29         warn "# new ",dump $self if $ENV{DEBUG};
30
31         return $self;
32 }
33
34 sub api { $_[0]->{api} }
35
36 sub mkbasedir {
37         my $dir = shift;
38         $dir =~ s{/[^/]+$}{}; # strip filename
39         mkdir $dir unless -e $dir;
40 }
41
42 sub modify_file {
43         my ( $self,$data ) = @_;
44
45 =for removed
46
47         if ( $data->{file} =~ m{^(.*/)?.sync/send/([^/]+)$} ) {
48                 my $from_dir = $1;
49                 warn "SEND $2 from $from_dir\n";
50                 my $sent_files;
51                 open(my $send, '<', $self->blob_path($data) );
52                 while(<$send>) {
53                         s/[\n\r]+$//;
54
55                         my ( $to, $file ) = split(/\s+/,$_,2);
56                         my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
57                                 getpwnam $to;
58
59                         my $from = $data;
60                         $from->{file} = $from_dir . $file;
61                         my $from_path = $self->blob_path($from);
62
63                         if ( ! -r $from_path ) {
64                                 warn "ERROR: $from_path: $!";
65                                 next;
66                         }
67
68                         my $to_path = "$dir/received/$file";
69                         mkbasedir $to_path;
70
71                         warn "SEND $from_path -> $to_path\n";
72                         unlink $to_path if -e $to_path; # FIXME why we need this?
73                         $sent_files->{$to} += link $from_path, $to_path;
74                         # FIXME cross-shard
75                         my $md5 = $self->md5sum($data)->get( $from_dir . $file ) || warn "no md5 for $from_dir$file";
76                         $self->md5sum({login => $to})->put( "/received/$file" => $md5 );
77                 }
78
79                 warn "SENT ",dump $sent_files;
80
81                 return 0; # skip dedup
82         } elsif ( $data->{file} =~ m{^(.*/)?.sync/pending/([^/]+)$} ) {
83                 my $from_dir = $1;
84                 warn "PENDIG $2 from $from_dir";
85                 open(my $pend, '<', $self->blob_path($data) );
86                 while(<$pend>) {
87                         s/[\n\r]+$//;
88
89                         if ( m/^DELETED\#(.+)$/ ) {
90                                 my $path = $self->blob_path($data => $from_dir . $1 );
91                                 if ( -e $path ) {
92                                         warn "UNLINK $path";
93                                         -d $path ? rmdir $path : unlink $path || warn "ERROR: unlink $path $!";
94                                         next;
95                                 } else {
96                                         warn "MISSING $path to unlink";
97                                         next;
98                                 }
99                         } elsif ( ! /^(MOVED|RENAMED)\#/ ) {
100                                 warn "skip $_\n";
101                                 next;
102                         }
103
104                         my ( undef, $from, $to ) = split(/\#/,$_,3);
105
106                         my ( $from_path, $to_path ) = map {
107                                 my $tmp = $data;
108                                 $tmp->{file} = $from_dir . $_;
109                                 $self->blob_path($tmp);
110                         } ( $from, $to );
111
112                         if ( ! -e $from_path ) {
113                                 warn "SKIPPED $from_path: $!";
114                                 next;
115                         }
116
117                         warn "MV $from_path -> $to_path";
118                         mkbasedir $to_path;
119                         rename $from_path, $to_path;
120         
121                         my $md5 = $self->md5sum($data)->get( $from_dir . $from );
122                         if ( ! $md5 ) {
123                                 warn "ERROR: no md5sum $from_dir $from " unless $md5;
124                                 next;
125                         }
126
127                         $self->md5sum($data)->out( $from_dir . $from );
128                         $self->md5sum($data)->put( $from_dir . $to => $md5 );
129                         $self->md5sum_close($data);
130
131                         warn "$md5 moved to $from_dir $to";
132                 }
133
134                 return 0; # skip dedup
135         }
136
137 =cut
138
139         if ( $data->{file} =~ m{^(.*/)?.sync/} ) {
140                 # ignore .sync/ files from client
141                 return 0;
142         }
143
144         #return $file->{size} > 4096 ? 1 : 0; # FIXME
145         return 1; # dedup
146 }
147
148 # never called by rsync directly!
149 sub new_file {
150         my ( $self,$data ) = @_;
151 #       $self->file_set($data);
152 }
153
154 # client doesn't issue --delete
155 sub removed_file {
156         my ( $self, $data ) = @_;
157
158         my $md5 = $self->md5sum($data)->get( $data->{file} );
159         return unless $md5; # directories don't have md5sums
160         my $path = $self->{md5pool} . '/' . $md5;
161         my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
162                 $atime,$mtime,$ctime,$blksize,$blocks)
163                         = stat($path);
164
165         my $user = $self->{api}->user_info($data->{login});
166
167         if ( $nlink == 1 && $uid == $user->{uid} ) {
168                 $self->append( $user, 'removed', -$size, $uid, $data->{file} );
169                 my $id = getpwnam 'md5';
170                 chown $id,$gid, $path;
171                 warn "# chown $id $gid $path";
172         }
173
174         $self->md5sum($data)->out( $data->{file} );
175         $self->md5sum_close($data);
176
177         $self->api->append_meta('md5sum', $user, 'delete', $data->{file} );
178 }
179
180 sub make_dir {
181         my ( $self, $data ) = @_;
182
183 }
184
185 sub new_link {
186         my ( $self, $data ) = @_;
187
188         warn "# new_link ",dump $data;
189
190 =for removed
191
192         if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
193                 my ( $dir, $to, $name ) = ( $1, $2, $3 );
194                 my $path = $self->blob_path($data);
195                 my $link_to = readlink $path;
196                 warn "$link_to";
197                 if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
198
199                         my $s = $path;
200                         $s =~ s{/[^/]+$}{}; # strip filename
201                         while ( $link_to =~ s{/../}{/} ) {
202                                 $s =~ s{/[^/]+$}{} || die "can't strip $s";
203                                 warn "## simplify $s $link_to\n";
204                         }
205                         $s .= $link_to;
206
207                         my $d = $self->blob_path({
208                                 pid => $data->{pid},
209                                 file => $name
210                         });
211
212                         # $name can contain directories so we must create them
213                         mkbasedir $d;
214
215                         if ( ! -e $s ) {
216                                 warn "ERROR: can't find source $s";
217                         } else {
218
219                                 warn "link $s -> $d\n";
220                                 link $s, $d;
221
222                                 my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
223
224 #                               my $origin = $self->file_get({
225 #                                       login => $l,
226 #                                       file  => $f,
227 #                               });
228 #                               $self->new_file($origin);
229                                 warn "INFO: sent file ",dump($l,$f);
230                                 my $md5 = $self->md5sum($data)->get($s);
231                                 $self->md5sum({ login => $to })->put($d => $md5 );
232                                 # FIXME broken!
233                         }
234
235
236                 } else {
237                         warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
238                 }
239         }
240
241 =cut
242
243 }
244
245 sub init_pid_login {
246         my ( $self, $pid, $login ) = @_;
247
248         $login =~ s/\@.+//;
249         $self->{pid}->{$pid} = $self->{api}->user_info($login);
250
251         warn "created $pid";
252 }
253
254 sub cleanup_pid {
255         my ( $self, $pid ) = @_;
256
257         $self->md5sum_close( $self->{pid}->{$pid} );
258
259         delete $self->{pid}->{$pid};
260         warn "removed $pid";
261 }
262
263 sub rsync_log {
264         my ( $self, $data ) = @_;
265         if ( $data =~ m/\[(\d+)\] rsync \w+ (\S+) \w+ (\S+)/ ) {
266                 my ( $pid, $module, $login ) = ( $1, $2, $3 );
267                 $self->init_pid_login( $pid, $login );
268         } elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
269                 my $pid = $1;
270                 $self->cleanup_pid( $pid );
271         } else {
272                 warn "## rsync_log $data";
273         }
274 }
275
276 sub blob_path {
277         my ( $self, $data, $path ) = @_;
278         my $blob = $self->{pid}->{ $data->{pid} }->{dir};
279         if ( ! $blob ) {
280                 warn "ERROR: $data->{pid} not found, possible restart?";
281                 $self->init_pid_login( $data->{pid}, $data->{login} );
282                 $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for ", dump( $self->{pid}->{ $data->{pid} } );
283         }
284         $blob .= '/' . ( defined $path ? $path : $data->{file} );
285         return $blob;
286 }
287
288
289 sub rsync_transfer {
290         my ( $self,$data ) = @_;
291
292         my $path = $self->blob_path($data);
293
294         if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
295                 my $type = $1;
296
297                 if ( $type eq 'f' ) {
298                         $self->modify_file( $data ) && # selective dedup
299                         $self->dedup( $data, $path );
300                 } elsif ( $type eq 'd' ) {
301                         $self->make_dir( $data );
302                 } elsif ( $type eq 'L' ) {
303                         $self->new_link( $data );
304                 } else {
305                         die "unknown type $type ", dump $data;
306                 }
307         } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
308                 $self->removed_file($data);
309         } else {
310                 warn "IGNORED ",dump($data) if $ENV{DEBUG};
311         }
312         return $data;
313 }
314
315 sub append {
316         my $self = shift @_;
317         $self->{api}->append( @_ );
318 }
319
320 sub md5pool {
321         my ( $self, $data ) = @_;
322
323         my $pool = $self->{md5pool} || die "no md5pool in ",dump $self;
324         mkdir $pool unless -e $pool;
325
326         my $md5 = $data->{md5} || die "no md5 in ",dump $data;
327         my $path = $self->blob_path($data);
328
329         if ( ! -e $path ) {
330                 warn "ERROR missing path $path";
331                 return;
332         }
333
334         my $pool_md5 = "$pool/$md5";
335
336         if ( -e $pool_md5 ) {
337                 warn "dedup hit $md5 $path\n";
338
339                 my ($pool_uid,$pool_size) = (stat($pool_md5))[4,7];
340                 my $user = $self->{api}->user_info( $data->{login} );
341
342                 if ( $pool_uid != $user->{uid} ) {
343                         if ( $pool_uid != $self->{api}->{md5}->{uid} ) {
344                                 chown $self->{api}->{md5}->{uid}, $self->{api}->{md5}->{gid}, $pool_md5;
345                                 chmod oct("0444"), $pool_md5;
346                                 my $steal_user = $self->{api}->user_info( $pool_uid );
347                                 $self->append( $steal_user, 'dedup-steal', $pool_size, $pool_uid, $data->{file} );
348                         }
349                         $self->append( $user, 'dedup', $pool_size, $pool_uid, $data->{file} );
350                 }
351
352                 my $dedup = $path . '.dedup';
353                 rename $path, $dedup;
354                 link "$pool/$md5", $path;
355                 unlink $dedup;
356         } else {
357                 link $path, "$pool/$md5";
358                 warn "dedup +++ $md5 $path";
359         }
360
361         $self->md5sum($data)->put( $data->{file} => $md5 );
362         $self->md5sum_close($data);
363
364         $self->api->append_meta('md5sum', $data->{login}, $md5, $data->{file} );
365 }
366
367 my $empty_md5 = " " x 32;
368
369 sub dedup {
370         my ( $self, $data, $path ) = @_;
371
372         if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
373                 my $dir = $1 || '';
374                 my $imported = 0;
375                 warn "IMPORT ", $data->{file}, "\n";
376                 open(my $md5sum, '<', $path);
377                 while(<$md5sum>) {
378                         chomp;
379                         my ( $md5, $file ) = split(/\s+/,$_,2);
380                         if ( ! -e "$self->{md5pool}/$md5" ) {
381                                 warn "MISSING $md5 $file\n";
382                                 next;
383                         }
384                         my $new = {
385                                 login => $data->{login},
386                                 pid => $data->{pid},
387                                 file => "$dir$file",
388                                 md5 => $md5,
389                         };
390                         my $new_path = $self->blob_path($new);
391                         if ( ! -e $new_path ) {
392                                 $self->{api}->send_file( 'md5' => $md5, $data->{login}, "$dir$file" );
393                                 $self->md5pool( $new );
394                         } else {
395                                 $self->md5pool( $new );
396                         }
397                 }
398                 warn "INFO imported $imported files from ",dump($data);
399
400                 return; # don't put md5sum files into pool
401         }
402
403         if ( $data->{md5} ne $empty_md5 ) {
404                 $self->md5pool( $data );
405         } else {
406                 warn "empty md5", dump $data;
407         }
408 }
409
410 1;