rmdir directories
[cloudstore.git] / lib / CloudStore / Store.pm
1 package CloudStore::Store;
2 use warnings;
3 use strict;
4
5 use autodie;
6 use JSON::XS;
7 use File::Path qw(make_path);
8 use File::Slurp qw();
9 use Digest::MD5 qw(md5_base64);
10 use Data::Dump qw(dump);
11 use Carp qw(confess);
12 use TokyoCabinet;
13
14 use WarnColor;
15
16 sub new {
17         my $class = shift;
18
19         my $self = {@_};
20         bless $self, $class;
21
22         die "no dir" unless $self->{dir};
23         $self->{md5pool} = $self->{dir} . '/md5';
24
25         warn "# new ",dump $self if $ENV{DEBUG};
26
27         return $self;
28 }
29
30 sub user_set {
31         my ( $self,$data ) = @_;
32 }
33
34 sub user_get {
35         my ( $self,$data ) = @_;
36 }
37
38 sub mkbasedir {
39         my $dir = shift;
40         $dir =~ s{/[^/]+$}{}; # strip filename
41         mkdir $dir unless -e $dir;
42 }
43
44 sub modify_file {
45         my ( $self,$data ) = @_;
46
47         if ( $data->{file} =~ m{^(.*/)?.sync/send/([^/]+)$} ) {
48                 my $from_dir = $1;
49                 warn "SEND $2 from $from_dir\n";
50                 my $sent_files;
51                 open(my $send, '<', $self->blob_path($data) );
52                 while(<$send>) {
53                         s/[\n\r]+$//;
54
55                         my ( $to, $file ) = split(/\s+/,$_,2);
56                         my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
57                                 getpwnam $to;
58
59                         my $from = $data;
60                         $from->{file} = $from_dir . $file;
61                         my $from_path = $self->blob_path($from);
62
63                         if ( ! -r $from_path ) {
64                                 warn "ERROR: $from_path: $!";
65                                 next;
66                         }
67
68                         my $to_path = "$dir/received/$file";
69                         mkbasedir $to_path;
70
71                         warn "SEND $from_path -> $to_path\n";
72                         unlink $to_path if -e $to_path; # FIXME why we need this?
73                         $sent_files->{$to} += link $from_path, $to_path;
74                         # FIXME cross-shard
75                         my $md5 = $self->md5sum($data)->get( $from_dir . $file ) || warn "no md5 for $from_dir$file";
76                         $self->md5sum({login => $to})->put( "/received/$file" => $md5 );
77                 }
78
79                 warn "SENT ",dump $sent_files;
80
81                 return 0; # skip dedup
82         } elsif ( $data->{file} =~ m{^(.*/)?.sync/pending/([^/]+)$} ) {
83                 my $from_dir = $1;
84                 warn "PENDIG $2 from $from_dir";
85                 open(my $pend, '<', $self->blob_path($data) );
86                 while(<$pend>) {
87                         s/[\n\r]+$//;
88
89                         if ( m/^DELETED\#(.+)$/ ) {
90                                 my $path = $self->blob_path($data => $from_dir . $1 );
91                                 if ( -e $path ) {
92                                         warn "UNLINK $path";
93                                         -d $path ? rmdir $path : unlink $path || warn "ERROR: unlink $path $!";
94                                         next;
95                                 } else {
96                                         warn "MISSING $path to unlink";
97                                         next;
98                                 }
99                         } elsif ( ! /^(MOVED|RENAMED)\#/ ) {
100                                 warn "skip $_\n";
101                                 next;
102                         }
103
104                         my ( undef, $from, $to ) = split(/\#/,$_,3);
105
106                         my ( $from_path, $to_path ) = map {
107                                 my $tmp = $data;
108                                 $tmp->{file} = $from_dir . $_;
109                                 $self->blob_path($tmp);
110                         } ( $from, $to );
111
112                         if ( ! -e $from_path ) {
113                                 warn "SKIPPED $from_path: $!";
114                                 next;
115                         }
116
117                         warn "MV $from_path -> $to_path";
118                         mkbasedir $to_path;
119                         rename $from_path, $to_path;
120         
121                         my $md5 = $self->md5sum($data)->get( $from_dir . $from );
122                         if ( ! $md5 ) {
123                                 warn "ERROR: no md5sum $from_dir $from " unless $md5;
124                                 next;
125                         }
126
127                         $self->md5sum($data)->out( $from_dir . $from );
128                         $self->md5sum($data)->put( $from_dir . $to => $md5 );
129
130                         warn "$md5 moved to $from_dir $to";
131                 }
132
133                 return 0; # skip dedup
134         }
135
136         #return $file->{size} > 4096 ? 1 : 0; # FIXME
137         return 1; # dedup
138 }
139
140 # never called by rsync directly!
141 sub new_file {
142         my ( $self,$data ) = @_;
143 #       $self->file_set($data);
144 }
145
146 sub remove_file {
147         my ( $self, $data ) = @_;
148
149         my $md5 = $self->md5sum($data)->get( $data->{file} );
150         return unless $md5; # directories don't have md5sums
151         my $path = $self->{md5pool} . '/' . $md5;
152         my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
153                 $atime,$mtime,$ctime,$blksize,$blocks)
154                         = stat($path);
155         if ( $nlink == 1 ) {
156                 my $id = getpwnam 'md5';
157                 chown $id,$gid, $path;
158                 warn "# chown $id $gid $path";
159         }
160
161         $self->md5sum($data)->out( $data->{file} );
162 }
163
164 sub make_dir {
165         my ( $self, $data ) = @_;
166
167 }
168
169 sub new_link {
170         my ( $self, $data ) = @_;
171
172         warn "# new_link ",dump $data;
173
174         if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
175                 my ( $dir, $to, $name ) = ( $1, $2, $3 );
176                 my $path = $self->blob_path($data);
177                 my $link_to = readlink $path;
178                 warn "$link_to";
179                 if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
180
181                         my $s = $path;
182                         $s =~ s{/[^/]+$}{}; # strip filename
183                         while ( $link_to =~ s{/../}{/} ) {
184                                 $s =~ s{/[^/]+$}{} || die "can't strip $s";
185                                 warn "## simplify $s $link_to\n";
186                         }
187                         $s .= $link_to;
188
189                         my $d = $self->blob_path({
190                                 pid => $data->{pid},
191                                 file => $name
192                         });
193
194                         # $name can contain directories so we must create them
195                         my $to_dir = $d;
196                         $to_dir =~ s{/[^/]+$}{};
197                         make_path $to_dir if ! -e $to_dir;
198
199                         if ( ! -e $s ) {
200                                 warn "ERROR: can't find source $s";
201                         } else {
202
203                                 warn "link $s -> $d\n";
204                                 link $s, $d;
205
206                                 my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
207
208 #                               my $origin = $self->file_get({
209 #                                       login => $l,
210 #                                       file  => $f,
211 #                               });
212 #                               $self->new_file($origin);
213                                 warn "INFO: sent file ",dump($l,$f);
214                                 my $md5 = $self->md5sum($data)->get($s);
215                                 $self->md5sum({ login => $to })->put($d => $md5 );
216                                 # FIXME broken!
217                         }
218
219
220                 } else {
221                         warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
222                 }
223         }
224 }
225
226 sub md5sum {
227         my ( $self, $data ) = @_;
228
229         my $login = $data->{login} || confess "missing login in ",dump $data;
230
231         return $self->{md5sum}->{$login} if exists $self->{md5sum}->{$login};
232
233         my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
234                 getpwnam $login;
235
236         my $md5_path = "$dir/.md5";
237
238         my $db = TokyoCabinet::HDB->new();
239         $db->open($md5_path, $db->OWRITER | $db->OCREAT)
240         or die "can't open $md5_path: ",$db->errmsg( $db->ecode );
241
242         warn "open $md5_path";
243
244         $self->{md5sum}->{$login} = $db;
245         return $db;
246 }
247
248 sub rsync_log {
249         my ( $self, $data ) = @_;
250         if ( $data =~ m/\[(\d+)\] rsync \w+ (\S+) \w+ (\S+)/ ) {
251                 my ( $pid, $module, $login ) = ( $1, $2, $3 );
252
253                 $login =~ s/\@.+//;
254                 my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
255                         getpwnam $login;
256
257                 $self->{pid}->{$pid} = {
258                         login => $login,
259                         uid => $uid,
260                         gid => $gid,
261                         email => $email,
262                         dir => $dir,
263                         shell => $shell,
264                 };
265
266                 warn "created $pid";
267
268         } elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
269                 my $pid = $1;
270
271                 foreach my $login ( keys %{ $self->{md5sum} } ) {
272                         $self->{md5sum}->{$login}->close;
273                         warn "close md5sum $login";
274                 }
275                 delete $self->{md5sum};
276
277                 delete $self->{pid}->{$pid};
278                 warn "removed $pid";
279 warn dump $self;
280
281         } else {
282 #               warn "## rsync_log $data";
283         }
284 }
285
286 sub blob_path {
287         my ( $self, $data, $path ) = @_;
288         my $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for $data->{pid} in ",dump( $self->{pid} );
289         $blob .= '/' . ( defined $path ? $path : $data->{file} );
290         return $blob;
291 }
292
293
294 sub rsync_transfer {
295         my ( $self,$data ) = @_;
296
297         my $path = $self->blob_path($data);
298
299         if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
300                 my $type = $1;
301
302                 if ( $type eq 'f' ) {
303                         $self->modify_file( $data ) && # selective dedup
304                         $self->dedup( $data, $path );
305                 } elsif ( $type eq 'd' ) {
306                         $self->make_dir( $data );
307                 } elsif ( $type eq 'L' ) {
308                         $self->new_link( $data );
309                 } else {
310                         die "unknown type $type ", dump $data;
311                 }
312         } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
313                 $self->remove_file($data);
314         } else {
315                 warn "IGNORED ",dump($data) if $ENV{DEBUG};
316         }
317         return $data;
318 }
319
320 sub md5pool {
321         my ( $self, $data ) = @_;
322
323         my $pool = $self->{md5pool} || die "no md5pool in ",dump $self;
324         mkdir $pool unless -e $pool;
325
326         my $md5 = $data->{md5} || die "no md5 in ",dump $data;
327         my $path = $self->blob_path($data);
328
329         if ( -e "$pool/$md5" ) {
330                 warn "dedup hit $md5 $path\n";
331                 my $dedup = $path . '.dedup';
332                 rename $path, $dedup;
333                 link "$pool/$md5", $path;
334                 unlink $dedup;
335                 # FIXME fix perms?
336         } else {
337                 link $path, "$pool/$md5";
338                 warn "dedup +++ $md5 $path";
339         }
340
341         $self->md5sum($data)->put( $data->{file} => $md5 );
342 }
343
344 my $empty_md5 = " " x 32;
345
346 sub dedup {
347         my ( $self, $data, $path ) = @_;
348
349         if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
350                 my $dir = $1;
351                 my $imported = 0;
352                 warn "IMPORT ", $data->{file}, "\n";
353                 open(my $md5sum, '<', $path);
354                 while(<$md5sum>) {
355                         chomp;
356                         my ( $md5, $file ) = split(/\s+/,$_,2);
357                         if ( ! -e "$self->{md5path}/$md5" ) {
358                                 warn "MISSING $md5 $file\n";
359                                 next;
360                         }
361                         my $new = {
362                                 pid => $data->{pid},
363                                 file => "$dir$file",
364                                 md5 => $md5,
365                         };
366                         my $new_path = $self->blob_path($new);
367                         if ( ! -e $new_path ) {
368                                 # create path from md5sum file
369                                 my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
370                                 make_path $only_dir unless -d $only_dir;
371                                 $imported += link "$self->{md5path}/$md5", $new_path;
372                                 $self->new_file($new);
373                                 warn "import from $path ",dump($new);
374                                 $self->md5pool( $new );
375                         } else {
376                                 $self->md5pool( $new );
377                         }
378                 }
379                 print "INFO imported $imported files from ",dump($data);
380         }
381
382         if ( $data->{md5} ne $empty_md5 ) {
383                 $self->md5pool( $data );
384         } else {
385                 warn "empty md5", dump $data;
386         }
387 }
388
389 1;