52c7ee7a5f2d371e57ee3e8555c1acecf02190d3
[cloudstore.git] / lib / CloudStore / Store.pm
1 package CloudStore::Store;
2 use warnings;
3 use strict;
4
5 use autodie;
6 use JSON::XS;
7 use File::Path qw(make_path);
8 use File::Slurp qw();
9 use Digest::MD5 qw(md5_base64);
10 use Data::Dump qw(dump);
11 use Carp qw(confess);
12 use BerkeleyDB;
13
14 use WarnColor;
15
16 sub new {
17         my $class = shift;
18
19         my $self = {@_};
20         bless $self, $class;
21
22         die "no dir" unless $self->{dir};
23         $self->{md5pool} = $self->{dir} . '/md5';
24
25         warn "# new ",dump $self if $ENV{DEBUG};
26
27         return $self;
28 }
29
30 sub user_set {
31         my ( $self,$data ) = @_;
32 }
33
34 sub user_get {
35         my ( $self,$data ) = @_;
36 }
37
38 sub modify_file {
39         my ( $self,$data ) = @_;
40
41         if ( $data->{file} =~ m{^(.*/).sync/send/([^/]+)$} ) {
42                 my $from_dir = $1;
43                 warn "SEND $2 from $from_dir";
44                 my $sent_files;
45                 open(my $send, '<', $self->blob_path($data) );
46                 while(<$send>) {
47 warn $_;
48                         chomp;
49                         my ( $to, $file ) = split(/\s+/,$_,2);
50                         my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
51                                 getpwnam $to;
52
53                         my $from = $data;
54                         $from->{file} = $from_dir . $file;
55                         my $from_path = $self->blob_path($from);
56
57                         if ( ! -r $from_path ) {
58                                 warn "ERROR: $from_path: $!";
59                                 next;
60                         }
61
62                         my $to_path = "$dir/received/$file";
63                         my $to_dir = $to_path;
64                         $to_dir =~ s{/[^/]+$}{};
65                         mkdir $to_dir unless -e $to_dir;
66
67                         warn "SEND $from_path -> $to_path";
68                         $sent_files->{$to} += link $from_path, $to_path;
69                         # FIXME cross-shard
70                 }
71
72                 warn "SENT ",dump $sent_files;
73
74                 return 0; # skip dedup
75         }
76
77         #return $file->{size} > 4096 ? 1 : 0; # FIXME
78         return 1; # dedup
79 }
80
81 # never called by rsync directly!
82 sub new_file {
83         my ( $self,$data ) = @_;
84 #       $self->file_set($data);
85 }
86
87 sub remove_file {
88         my ( $self, $data ) = @_;
89
90         my $md5sum = $self->md5sum($data);
91         my $md5 = $md5sum->{ $data->{file} } || return; # directories don't have md5sums
92         my $path = $self->{md5pool} . '/' . $md5;
93         my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
94                 $atime,$mtime,$ctime,$blksize,$blocks)
95                         = stat($path);
96         if ( $nlink == 1 ) {
97                 my $id = getpwnam 'md5';
98                 chown $id,$gid, $path;
99                 warn "# chown $id $gid $path";
100         }
101 }
102
103 sub make_dir {
104         my ( $self, $data ) = @_;
105
106 }
107
108 sub new_link {
109         my ( $self, $data ) = @_;
110
111         warn "# new_link ",dump $data;
112
113         if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
114                 my ( $dir, $to, $name ) = ( $1, $2, $3 );
115                 my $path = $self->blob_path($data);
116                 my $link_to = readlink $path;
117                 warn "$link_to";
118                 if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
119
120                         my $s = $path;
121                         $s =~ s{/[^/]+$}{}; # strip filename
122                         while ( $link_to =~ s{/../}{/} ) {
123                                 $s =~ s{/[^/]+$}{} || die "can't strip $s";
124                                 warn "## simplify $s $link_to\n";
125                         }
126                         $s .= $link_to;
127
128                         my $d = $self->blob_path({
129                                 pid => $data->{pid},
130                                 file => $name
131                         });
132
133                         # $name can contain directories so we must create them
134                         my $to_dir = $d;
135                         $to_dir =~ s{/[^/]+$}{};
136                         make_path $to_dir if ! -e $to_dir;
137
138                         if ( ! -e $s ) {
139                                 warn "ERROR: can't find source $s";
140                         } else {
141
142                                 warn "link $s -> $d\n";
143                                 link $s, $d;
144
145                                 my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
146
147 #                               my $origin = $self->file_get({
148 #                                       login => $l,
149 #                                       file  => $f,
150 #                               });
151 #                               $self->new_file($origin);
152                                 warn "INFO: sent file ",dump($l,$f);
153                                 my $md5sum = $self->md5sum($data);
154
155                                 my $md5 = $md5sum->{$s} || die "no md5 for $s";
156                                 $md5sum->{$d} = $md5; # FIXME broken!
157                         }
158
159
160                 } else {
161                         warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
162                 }
163         }
164 }
165
166 our $md5_login;
167 sub md5sum {
168         my ( $self, $data ) = @_;
169
170         if ( exists $md5_login->{$data->{login}} ) {
171                 return $md5_login->{$data->{login}};
172         } elsif ( my $login = $data->{login} ) {
173
174                 my $md5_path = $self->{dir} || die "no dir?";
175                 $login =~ s/^u//;
176                 $md5_path .= "/$login/.md5.db";
177
178                 my %md5;
179                 my $db = tie %md5, 'BerkeleyDB::Hash',
180                         -Filename => $md5_path,
181                         -Flags => DB_CREATE,
182                 ;
183
184                 return $md5_login->{$login} = \%md5;
185         } else {
186                 confess "can't open md5sum";
187         }
188 }
189
190 sub rsync_log {
191         my ( $self, $data ) = @_;
192         if ( $data =~ m/\[(\d+)\] rsync \w+ (\S+) \w+ (\S+)/ ) {
193                 my ( $pid, $module, $login ) = ( $1, $2, $3 );
194
195                 $login =~ s/\@.+//;
196                 my ( undef, undef, $uid, $gid, undef, undef, $email, $dir, $shell ) =
197                         getpwnam $login;
198
199                 $self->{pid}->{$pid} = {
200                         login => $login,
201                         uid => $uid,
202                         gid => $gid,
203                         email => $email,
204                         dir => $dir,
205                         shell => $shell,
206                 };
207
208                 warn "created $pid";
209
210         } elsif ( $data =~ m/\[(\d+)\] sent \S+ bytes\s+received \S+ bytes/ ) {
211                 my $pid = $1;
212                 untie $md5_login->{ $self->{$pid}->{login} } && warn "untie $pid";
213                 delete $self->{pid}->{$pid};
214                 warn "removed $pid";
215         } else {
216 #               warn "## rsync_log $data";
217         }
218 }
219
220 sub blob_path {
221         my ( $self, $data ) = @_;
222         my $blob = $self->{pid}->{ $data->{pid} }->{dir} || die "no dir for $data->{pid} in ",dump( $self->{pid} );
223         $blob .= '/' . $data->{file};
224         return $blob;
225 }
226
227
228 sub rsync_transfer {
229         my ( $self,$data ) = @_;
230
231         my $path = $self->blob_path($data);
232
233         if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
234                 my $type = $1;
235
236                 if ( $type eq 'f' ) {
237                         $self->modify_file( $data ) && # selective dedup
238                         $self->dedup( $data, $path );
239                 } elsif ( $type eq 'd' ) {
240                         $self->make_dir( $data );
241                 } elsif ( $type eq 'L' ) {
242                         $self->new_link( $data );
243                 } else {
244                         die "unknown type $type ", dump $data;
245                 }
246         } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
247                 $self->remove_file($data);
248         }
249         return $data;
250 }
251
252 sub md5pool {
253         my ( $self, $data ) = @_;
254
255         my $pool = $self->{md5pool} || die "no md5pool in ",dump $self;
256         mkdir $pool unless -e $pool;
257
258         my $md5 = $data->{md5} || die "no md5 in ",dump $data;
259         my $path = $self->blob_path($data);
260
261         if ( -e "$pool/$md5" ) {
262                 warn "dedup hit $md5 $path\n";
263                 my $dedup = $path . '.dedup';
264                 rename $path, $dedup;
265                 link "$pool/$md5", $path;
266                 unlink $dedup;
267                 # FIXME fix perms?
268         } else {
269                 link $path, "$pool/$md5";
270         }
271
272         my $md5sum = $self->md5sum($data);
273         $md5sum->{ $data->{file} } = $md5;
274 }
275
276 my $empty_md5 = " " x 32;
277
278 sub dedup {
279         my ( $self, $data, $path ) = @_;
280
281         if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
282                 my $dir = $1;
283                 my $imported = 0;
284                 warn "IMPORT ", $data->{file}, "\n";
285                 open(my $md5sum, '<', $path);
286                 while(<$md5sum>) {
287                         chomp;
288                         my ( $md5, $file ) = split(/\s+/,$_,2);
289                         if ( ! -e "$self->{md5path}/$md5" ) {
290                                 warn "MISSING $md5 $file\n";
291                                 next;
292                         }
293                         my $new = {
294                                 pid => $data->{pid},
295                                 file => "$dir$file",
296                                 md5 => $md5,
297                         };
298                         my $new_path = $self->blob_path($new);
299                         if ( ! -e $new_path ) {
300                                 # create path from md5sum file
301                                 my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
302                                 make_path $only_dir unless -d $only_dir;
303                                 $imported += link "$self->{md5path}/$md5", $new_path;
304                                 $self->new_file($new);
305                                 warn "import from $path ",dump($new);
306                                 $self->md5pool( $new );
307                         } else {
308                                 $self->md5pool( $new );
309                         }
310                 }
311                 print "INFO imported $imported files from ",dump($data);
312         }
313
314         if ( $data->{md5} ne $empty_md5 ) {
315                 $self->md5pool( $data );
316         } else {
317                 warn "empty md5", dump $data;
318         }
319 }
320
321 1;