1 package CloudStore::Store;
7 use File::Path qw(make_path);
10 use Digest::MD5 qw(md5_base64);
11 use Data::Dump qw(dump);
29 foreach my $bucket ( keys %$buckets ) {
30 my $port = $buckets->{$bucket};
31 my $server = new Cache::Memcached {
32 'servers' => [ "127.0.0.1:$port" ],
33 'debug' => defined $ENV{DEBUG} && $ENV{DEBUG} > 3,
34 # 'compress_threshold' => 10_000,
36 #$server->set_servers($array_ref);
37 #$server->set_compress_threshold(10_000);
38 $server->enable_compress(0);
39 $self->{$bucket} = $server;
43 warn "# new ",dump $self if $ENV{DEBUG};
49 my ($self,$bucket,$key,$data) = @_;
50 confess "data not ref ",dump($data) unless ref $data;
51 my $json = encode_json $data;
52 $self->{$bucket}->set( $key => $json );
53 warn "## json_set $bucket $key $json\n";
58 my ($self,$bucket,$key,$data) = @_;
59 if ( my $json = $self->{$bucket}->get($key) ) {
60 warn "## json_get $bucket $key $json\n";
61 return decode_json $json;
66 my ($self,$data) = @_;
67 $self->json_set( 'users', $data->{login}, $data );
71 my ($self,$login) = @_;
72 $login = $login->{login} if ref $login;
73 my $user = $self->json_get( 'users', $login );
74 $user->{usage} = $self->usage( $login );
75 $user->{status} = $self->status( $login );
76 warn "## user ",dump($user) if $ENV{DEBUG};
81 my ($self,$login,$message) = @_;
82 $login = $login->{login} if ref $login;
84 $self->{session}->set( "$login:status" => $message );
87 $self->{session}->get( "$login:status" );
92 my ($self,$data) = @_;
93 $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
97 my ($self,$data) = @_;
98 $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
102 my ($self,$login) = @_;
103 $login = $login->{login} if ref $login;
104 $self->{session}->get( $login . ':usage' );
110 my $url = sprintf $fmt, @_;
112 warn "# couchdb $url\n";
113 if ( my $json = get $url ) {
114 warn "## $url $json\n";
115 my $r = decode_json $json;
121 my ($self,$login) = @_;
122 $login = $login->{login} if ref $login;
126 if ( my $r = $self->couchdb(
127 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after'
132 $usage = $r->{rows}->[0]->{value};
133 $usage = 0 unless defined $usage;
136 $self->{session}->set( $login . ':usage' => $usage );
141 #md5_base64( $data->{login} . '/' . $data->{file} );
142 $data->{login} . ':' . $data->{file};
146 my ($self,$data) = @_;
147 $self->json_set( 'files', _file_key($data), $data );
151 my ($self,$data) = @_;
152 $self->json_get( 'files', _file_key($data) );
156 my ( $self,$data ) = @_;
158 if ( my $old = $self->file_get( $data ) ) {
159 $self->usage_decr( $data );
162 $self->new_file($data);
166 my ( $self,$data ) = @_;
167 $self->file_set($data);
168 $self->usage_incr($data);
172 my ( $self, $data ) = @_;
173 $self->usage_decr( $data );
174 my $k = _file_key $data;
175 $self->{files}->delete( $k );
179 my ( $self, $data ) = @_;
184 my ( $self, $data ) = @_;
186 warn "# new_link ",dump $data;
188 if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
189 my ( $dir, $to, $name ) = ( $1, $2, $3 );
190 my $path = "users/$data->{login}/blob/" . $data->{file};
191 my $link_to = readlink $path;
193 if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
196 $s =~ s{/[^/]+$}{}; # strip filename
197 while ( $link_to =~ s{/../}{/} ) {
198 $s =~ s{/[^/]+$}{} || die "can't strip $s";
199 warn "## simplify $s $link_to\n";
203 my $d = "users/$to/blob";
205 warn "ERROR: no to user $to in $d";
210 # $name can contain directories so we must create them
212 $to_dir =~ s{/[^/]+$}{};
213 make_path $to_dir if ! -e $to_dir;
216 warn "ERROR: can't find source $s";
219 warn "link $s -> $d\n";
222 my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
224 my $origin = $self->file_get({
228 $self->new_file($origin);
229 warn "INFO: sent file ",dump($origin);
234 warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
240 my ( $self,$data ) = @_;
242 my $blob = "users/$data->{login}/blob";
243 my $path = "$blob/$data->{file}";
245 if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
248 if ( $type eq 'f' ) {
249 $self->modify_file( $data );
250 $self->dedup( $data, $path );
251 } elsif ( $type eq 'd' ) {
252 $self->make_dir( $data );
253 } elsif ( $type eq 'L' ) {
254 $self->new_link( $data );
256 die "unknown type $type ", dump $data;
258 } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
259 $self->remove_file($data);
265 my ( $path, $md5 ) = @_;
267 my $pool = 'md5'; # FIXME sharding?
268 mkdir $pool unless -e $pool;
270 if ( -e "$pool/$md5" ) {
271 warn "dedup hit $md5 $path\n";
272 my $dedup = $path . '.dedup';
273 rename $path, $dedup;
274 link "$pool/$md5", $path;
277 link $path, "$pool/$md5";
281 my $empty_md5 = " " x 32;
284 my ( $self, $data, $path ) = @_;
286 if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
289 warn "IMPORT ", $data->{file}, "\n";
290 open(my $md5sum, '<', $path);
293 my ( $md5, $file ) = split(/\s+/,$_,2);
294 if ( ! -e "md5/$md5" ) {
295 warn "MISSING $md5 $file\n";
298 my $new = "users/$data->{login}/blob/$dir$file";
300 # create path from md5sum file
301 my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
302 make_path $only_dir unless -d $only_dir;
303 $imported += link "md5/$md5", $new;
305 login => $data->{login},
306 host => $data->{host},
307 file => $dir . $file,
311 $self->new_file($fake);
312 warn "import from $path ",dump($fake);
314 md5pool $new => $md5;
317 print "INFO imported $imported files from ",dump($data);
320 if ( $data->{md5} ne $empty_md5 ) {
321 md5pool $path => $data->{md5};