1 package CloudStore::Couchbase;
7 use File::Path qw(make_path);
10 use Digest::MD5 qw(md5_base64);
11 use Data::Dump qw(dump);
27 foreach my $bucket ( keys %$buckets ) {
28 my $port = $buckets->{$bucket};
29 my $server = new Cache::Memcached {
30 'servers' => [ "127.0.0.1:$port" ],
31 'debug' => $ENV{DEBUG},
32 # 'compress_threshold' => 10_000,
34 #$server->set_servers($array_ref);
35 #$server->set_compress_threshold(10_000);
36 $server->enable_compress(0);
37 $self->{$bucket} = $server;
41 warn "# new ",dump $self if $ENV{DEBUG};
47 my ($self,$bucket,$key,$data) = @_;
48 confess "data not ref ",dump($data) unless ref $data;
49 my $json = encode_json $data;
50 $self->{$bucket}->set( $key => $json );
51 warn "## $bucket set $key $json\n";
56 my ($self,$bucket,$key,$data) = @_;
57 if ( my $json = $self->{$bucket}->get($key) ) {
58 warn "## $bucket get $key $json\n";
59 return decode_json $json;
64 my ($self,$data) = @_;
65 $self->json_set( 'users', $data->{login}, $data );
69 my ($self,$login) = @_;
70 $login = $login->{login} if ref $login;
71 my $user = $self->json_get( 'users', $login );
72 $user->{usage} = $self->usage( $login );
73 $user->{status} = $self->status( $login );
74 warn "## user ",dump($user);
79 my ($self,$login,$message) = @_;
80 $login = $login->{login} if ref $login;
82 $self->{session}->set( "$login:status" => $message );
84 $self->{session}->get( "$login:status" );
89 my ($self,$data) = @_;
90 $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
94 my ($self,$data) = @_;
95 $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
99 my ($self,$login) = @_;
100 $login = $login->{login} if ref $login;
101 $self->{session}->get( $login . ':usage' );
107 my $url = sprintf $fmt, @_;
109 warn "# couchdb $url\n";
110 if ( my $json = get $url ) {
111 warn "## $url $json\n";
112 my $r = decode_json $json;
118 my ($self,$login) = @_;
119 $login = $login->{login} if ref $login;
123 if ( my $r = $self->couchdb(
124 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after'
129 $usage = $r->{rows}->[0]->{value};
130 $usage = 0 unless defined $usage;
133 $self->{session}->set( $login . ':usage' => $usage );
138 #md5_base64( $data->{login} . '/' . $data->{file} );
139 $data->{login} . ':' . $data->{file};
143 my ($self,$data) = @_;
144 $self->json_set( 'files', _file_key($data), $data );
148 my ($self,$data) = @_;
149 $self->json_get( 'files', _file_key($data) );
153 my ( $self,$data ) = @_;
155 if ( my $old = $self->file_get( $data ) ) {
156 $self->usage_decr( $data );
159 $self->new_file($data);
163 my ( $self,$data ) = @_;
164 $self->file_set($data);
165 $self->usage_incr($data);
169 my ( $self, $data ) = @_;
170 $self->usage_decr( $data );
171 my $k = _file_key $data;
172 $self->{files}->delete( $k );
176 my ( $self, $data ) = @_;
181 my ( $self, $data ) = @_;
183 warn "# new_link ",dump $data;
185 if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
186 my ( $dir, $to, $name ) = ( $1, $2, $3 );
187 my $path = "users/$data->{login}/blob/" . $data->{file};
188 my $file = readlink $path;
189 warn "## $path -> $file";
190 if ( $file =~ s{^\Q/rsyncd-munged/../../\E}{$dir} ) {
191 warn "SEND To:$to Name:$name File:$file\n";
192 my $s = "users/$data->{login}/blob/$file";
193 my $d = "users/$to/blob";
194 die "no user $to" unless -e $d;
197 # since $name can contain directories we must create them
199 $to_dir =~ s{/[^/]+$}{};
200 make_path $to_dir if ! -e $to_dir;
202 warn "link $s -> $d\n";
205 warn "ERROR: can't SEND To:$to Name:$name File:$file";
211 my ( $self,$data ) = @_;
213 my $blob = "users/$data->{login}/blob";
214 my $path = "$blob/$data->{file}";
216 if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
219 if ( $type eq 'f' ) {
220 $self->modify_file( $data );
221 $self->dedup( $data, $path );
222 } elsif ( $type eq 'd' ) {
223 $self->make_dir( $data );
224 } elsif ( $type eq 'L' ) {
225 $self->new_link( $data );
227 die "unknown type $type ", dump $data;
229 } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
230 $self->remove_file($data);
236 my ( $path, $md5 ) = @_;
238 my $pool = 'md5'; # FIXME sharding?
239 mkdir $pool unless -e $pool;
241 if ( -e "$pool/$md5" ) {
242 warn "dedup hit $md5 $path\n";
243 my $dedup = $path . '.dedup';
244 rename $path, $dedup;
245 link "$pool/$md5", $path;
248 link $path, "$pool/$md5";
252 my $empty_md5 = " " x 32;
255 my ( $self, $data, $path ) = @_;
257 if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
260 warn "IMPORT ", $data->{file}, "\n";
261 open(my $md5sum, '<', $path);
264 my ( $md5, $file ) = split(/\s+/,$_,2);
265 if ( ! -e "md5/$md5" ) {
266 warn "MISSING $md5 $file\n";
269 my $new = "users/$data->{login}/blob/$dir$file";
271 # create path from md5sum file
272 my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
273 make_path $only_dir unless -d $only_dir;
274 $imported += link "md5/$md5", $new;
276 login => $data->{login},
277 host => $data->{host},
278 file => $dir . $file,
282 $self->new_file($fake);
283 warn "fake ",dump($fake);
285 md5pool $new => $md5;
288 print "INFO imported $imported files from ",dump($data);
291 if ( $data->{md5} ne $empty_md5 ) {
292 md5pool $path => $data->{md5};