use slice name and get dir from group
[cloudstore.git] / lib / CloudStore / Store.pm
1 package CloudStore::Store;
2 use warnings;
3 use strict;
4
5 use lib 'lib';
6 use CloudStore::API;
7
8 use autodie;
9 use JSON::XS;
10 use File::Path qw(make_path);
11 use File::Slurp qw();
12 use Digest::MD5 qw(md5_base64);
13 use Data::Dump qw(dump);
14 use Carp qw(confess);
15 use TokyoCabinet;
16
17 use WarnColor;
18
19 sub new {
20         my ($class,$group) = @_;
21
22         my $self = {
23                 api => CloudStore::API->new( $group ),
24         };
25         bless $self, $class;
26
27         $self->{md5pool} = $self->{api}->{md5}->{dir};
28
29         warn "# new ",dump $self if $ENV{DEBUG};
30
31         return $self;
32 }
33
34 sub user_set {
35         my ( $self,$data ) = @_;
36 }
37
38 sub user_get {
39         my ( $self,$data ) = @_;
40 }
41
42 sub mkbasedir {
43         my $dir = shift;
44         $dir =~ s{/[^/]+$}{}; # strip filename
45         mkdir $dir unless -e $dir;
46 }
47
48 sub modify_file {
49         my ( $self,$data ) = @_;
50
51         if ( $data->{file} =~ m{^(.*/)?.sync/send/([^/]+)$} ) {
52                 my $from_dir = $1;
53                 warn "SEND $2 from $from_dir\n";
54                 my $sent_files;
55                 open(my $send, '<', $self->blob_path($data) );
56                 while(<$send>) {
57                         s/[\n\r]+$//;
58
59                         my ( $to, $file ) = split(/\s+/,$_,2);
60                         my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
61                                 getpwnam $to;
62
63                         my $from = $data;
64                         $from->{file} = $from_dir . $file;
65                         my $from_path = $self->blob_path($from);
66
67                         if ( ! -r $from_path ) {
68                                 warn "ERROR: $from_path: $!";
69                                 next;
70                         }
71
72                         my $to_path = "$dir/received/$file";
73                         mkbasedir $to_path;
74
75                         warn "SEND $from_path -> $to_path\n";
76                         unlink $to_path if -e $to_path; # FIXME why we need this?
77                         $sent_files->{$to} += link $from_path, $to_path;
78                         # FIXME cross-shard
79                         my $md5 = $self->md5sum($data)->get( $from_dir . $file ) || warn "no md5 for $from_dir$file";
80                         $self->md5sum({login => $to})->put( "/received/$file" => $md5 );
81                 }
82
83                 warn "SENT ",dump $sent_files;
84
85                 return 0; # skip dedup
86         } elsif ( $data->{file} =~ m{^(.*/)?.sync/pending/([^/]+)$} ) {
87                 my $from_dir = $1;
88                 warn "PENDIG $2 from $from_dir";
89                 open(my $pend, '<', $self->blob_path($data) );
90                 while(<$pend>) {
91                         s/[\n\r]+$//;
92
93                         if ( m/^DELETED\#(.+)$/ ) {
94                                 my $path = $self->blob_path($data => $from_dir . $1 );
95                                 if ( -e $path ) {
96                                         warn "UNLINK $path";
97                                         -d $path ? rmdir $path : unlink $path || warn "ERROR: unlink $path $!";
98                                         next;
99                                 } else {
100                                         warn "MISSING $path to unlink";
101                                         next;
102                                 }
103                         } elsif ( ! /^(MOVED|RENAMED)\#/ ) {
104                                 warn "skip $_\n";
105                                 next;
106                         }
107
108                         my ( undef, $from, $to ) = split(/\#/,$_,3);
109
110                         my ( $from_path, $to_path ) = map {
111                                 my $tmp = $data;
112                                 $tmp->{file} = $from_dir . $_;
113                                 $self->blob_path($tmp);
114                         } ( $from, $to );
115
116                         if ( ! -e $from_path ) {
117                                 warn "SKIPPED $from_path: $!";
118                                 next;
119                         }
120
121                         warn "MV $from_path -> $to_path";
122                         mkbasedir $to_path;
123                         rename $from_path, $to_path;
124         
125                         my $md5 = $self->md5sum($data)->get( $from_dir . $from );
126                         if ( ! $md5 ) {
127                                 warn "ERROR: no md5sum $from_dir $from " unless $md5;
128                                 next;
129                         }
130
131                         $self->md5sum($data)->out( $from_dir . $from );
132                         $self->md5sum($data)->put( $from_dir . $to => $md5 );
133
134                         warn "$md5 moved to $from_dir $to";
135                 }
136
137                 return 0; # skip dedup
138         }
139
140         #return $file->{size} > 4096 ? 1 : 0; # FIXME
141         return 1; # dedup
142 }
143
144 # never called by rsync directly!
145 sub new_file {
146         my ( $self,$data ) = @_;
147 #       $self->file_set($data);
148 }
149
150 sub remove_file {
151         my ( $self, $data ) = @_;
152
153         my $md5 = $self->md5sum($data)->get( $data->{file} );
154         return unless $md5; # directories don't have md5sums
155         my $path = $self->{md5pool} . '/' . $md5;
156         my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
157                 $atime,$mtime,$ctime,$blksize,$blocks)
158                         = stat($path);
159         if ( $nlink == 1 ) {
160                 my $id = getpwnam 'md5';
161                 chown $id,$gid, $path;
162                 warn "# chown $id $gid $path";
163         }
164
165         $self->md5sum($data)->out( $data->{file} );
166 }
167
168 sub make_dir {
169         my ( $self, $data ) = @_;
170
171 }
172
173 sub new_link {
174         my ( $self, $data ) = @_;
175
176         warn "# new_link ",dump $data;
177
178         if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
179                 my ( $dir, $to, $name ) = ( $1, $2, $3 );
180                 my $path = $self->blob_path($data);
181                 my $link_to = readlink $path;
182                 warn "$link_to";
183                 if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
184
185                         my $s = $path;
186                         $s =~ s{/[^/]+$}{}; # strip filename
187                         while ( $link_to =~ s{/../}{/} ) {
188                                 $s =~ s{/[^/]+$}{} || die "can't strip $s";
189                                 warn "## simplify $s $link_to\n";
190                         }
191                         $s .= $link_to;
192
193                         my $d = $self->blob_path({
194                                 pid => $data->{pid},
195                                 file => $name
196                         });
197
198                         # $name can contain directories so we must create them
199                         mkbasedir $d;
200
201                         if ( ! -e $s ) {
202                                 warn "ERROR: can't find source $s";
203                         } else {
204
205                                 warn "link $s -> $d\n";
206                                 link $s, $d;
207
208                                 my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
209
210 #                               my $origin = $self->file_get({
211 #                                       login => $l,
212 #                                       file  => $f,
213 #                               });
214 #                               $self->new_file($origin);
215                                 warn "INFO: sent file ",dump($l,$f);
216                                 my $md5 = $self->md5sum($data)->get($s);
217                                 $self->md5sum({ login => $to })->put($d => $md5 );
218                                 # FIXME broken!
219                         }
220
221
222                 } else {
223                         warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
224                 }
225         }
226 }
227
228 sub md5sum {
229         my ( $self, $data ) = @_;
230
231         my $login = $data->{login} || confess "missing login in ",dump $data;
232
233         return $self->{md5sum}->{$login} if exists $self->{md5sum}->{$login};
234
235         my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
236                 getpwnam $login;
237
238         my $md5_path = "$dir/.md5";
239
240         my $db = TokyoCabinet::HDB->new();
241         $db->open($md5_path, $db->OWRITER | $db->OCREAT)
242         or die "can't open $md5_path: ",$db->errmsg( $db->ecode );
243
244         warn "open $md5_path";
245
246         $self->{md5sum}->{$login} = $db;
247         return $db;
248 }
249
250 sub init_pid_login {
251         my ( $self, $pid, $login ) = @_;
252
253         $login =~ s/\@.+//;
254         my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
255                 getpwnam $login;
256
257         $self->{pid}->{$pid} = {
258                 login => $login,
259                 uid => $uid,
260                 gid => $gid,
261                 email => $email,
262                 dir => $dir,
263                 shell => $shell,
264         };
265
266         warn "created $pid";
267 }
268
269 sub cleanup_pid {
270         my ( $self, $pid ) = @_;
271         foreach my $login ( keys %{ $self->{md5sum} } ) {
272                 # FIXME only login's?
273                 $self->{md5sum}->{$login}->close;
274                 warn "close md5sum $login";
275         }
276         delete $self->{md5sum};
277
278         delete $self->{pid}->{$pid};
279         warn "removed $pid";
280 }
281
282 sub rsync_log {
283         my ( $self, $data ) = @_;
284         if ( $data =~ m/\[(\d+)\] rsync \w+ (\S+) \w+ (\S+)/ ) {
285                 my ( $pid, $module, $login ) = ( $1, $2, $3 );
286                 $self->init_pid_login( $pid, $login );
287         } elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
288                 my $pid = $1;
289                 $self->cleanup_pid( $pid );
290         } else {
291 #               warn "## rsync_log $data";
292         }
293 }
294
295 sub blob_path {
296         my ( $self, $data, $path ) = @_;
297         my $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for $data->{pid} in ",dump( $self->{pid} );
298         $blob .= '/' . ( defined $path ? $path : $data->{file} );
299         return $blob;
300 }
301
302
303 sub rsync_transfer {
304         my ( $self,$data ) = @_;
305
306         my $path = $self->blob_path($data);
307
308         if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
309                 my $type = $1;
310
311                 if ( $type eq 'f' ) {
312                         $self->modify_file( $data ) && # selective dedup
313                         $self->dedup( $data, $path );
314                 } elsif ( $type eq 'd' ) {
315                         $self->make_dir( $data );
316                 } elsif ( $type eq 'L' ) {
317                         $self->new_link( $data );
318                 } else {
319                         die "unknown type $type ", dump $data;
320                 }
321         } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
322                 $self->remove_file($data);
323         } else {
324                 warn "IGNORED ",dump($data) if $ENV{DEBUG};
325         }
326         return $data;
327 }
328
329 sub md5pool {
330         my ( $self, $data ) = @_;
331
332         my $pool = $self->{md5pool} || die "no md5pool in ",dump $self;
333         mkdir $pool unless -e $pool;
334
335         my $md5 = $data->{md5} || die "no md5 in ",dump $data;
336         my $path = $self->blob_path($data);
337
338         if ( -e "$pool/$md5" ) {
339                 warn "dedup hit $md5 $path\n";
340                 my $dedup = $path . '.dedup';
341                 rename $path, $dedup;
342                 link "$pool/$md5", $path;
343                 unlink $dedup;
344                 # FIXME fix perms?
345         } else {
346                 link $path, "$pool/$md5";
347                 warn "dedup +++ $md5 $path";
348         }
349
350         $self->md5sum($data)->put( $data->{file} => $md5 );
351 }
352
353 my $empty_md5 = " " x 32;
354
355 sub dedup {
356         my ( $self, $data, $path ) = @_;
357
358         if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
359                 my $dir = $1;
360                 my $imported = 0;
361                 warn "IMPORT ", $data->{file}, "\n";
362                 open(my $md5sum, '<', $path);
363                 while(<$md5sum>) {
364                         chomp;
365                         my ( $md5, $file ) = split(/\s+/,$_,2);
366                         if ( ! -e "$self->{md5path}/$md5" ) {
367                                 warn "MISSING $md5 $file\n";
368                                 next;
369                         }
370                         my $new = {
371                                 pid => $data->{pid},
372                                 file => "$dir$file",
373                                 md5 => $md5,
374                         };
375                         my $new_path = $self->blob_path($new);
376                         if ( ! -e $new_path ) {
377                                 # create path from md5sum file
378                                 mkbasedir $new_path;
379                                 $imported += link "$self->{md5path}/$md5", $new_path;
380                                 $self->new_file($new);
381                                 warn "import from $path ",dump($new);
382                                 $self->md5pool( $new );
383                         } else {
384                                 $self->md5pool( $new );
385                         }
386                 }
387                 print "INFO imported $imported files from ",dump($data);
388         }
389
390         if ( $data->{md5} ne $empty_md5 ) {
391                 $self->md5sum($data)->put( $data->{file} => $data->{md5} );
392                 $self->md5pool( $data );
393         } else {
394                 warn "empty md5", dump $data;
395         }
396 }
397
398 1;