1 package CloudStore::Couchbase;
7 use File::Path qw(make_path);
10 use Digest::MD5 qw(md5_base64);
11 use Data::Dump qw(dump);
27 foreach my $bucket ( keys %$buckets ) {
28 my $port = $buckets->{$bucket};
29 my $server = new Cache::Memcached {
30 'servers' => [ "127.0.0.1:$port" ],
31 'debug' => $ENV{DEBUG},
32 # 'compress_threshold' => 10_000,
34 #$server->set_servers($array_ref);
35 #$server->set_compress_threshold(10_000);
36 $server->enable_compress(0);
37 $self->{$bucket} = $server;
41 warn "# new ",dump $self if $ENV{DEBUG};
47 my ($self,$bucket,$key,$data) = @_;
48 confess "data not ref ",dump($data) unless ref $data;
49 my $json = encode_json $data;
50 $self->{$bucket}->set( $key => $json );
51 warn "## $bucket set $key $json\n";
56 my ($self,$bucket,$key,$data) = @_;
57 if ( my $json = $self->{$bucket}->get($key) ) {
58 warn "## $bucket get $key $json\n";
59 return decode_json $json;
64 my ($self,$data) = @_;
65 $self->json_set( 'users', $data->{login}, $data );
69 my ($self,$login) = @_;
70 $self->json_get( 'users', $login );
74 my ($self,$data) = @_;
75 $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
79 my ($self,$data) = @_;
80 $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
84 my ($self,$data) = @_;
85 $self->{session}->get( $data->{login} . ':usage' );
89 my ($self,$data) = @_;
94 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"'
99 warn "usage from $url";
100 if ( my $json = get $url ) {
101 warn "# JSON = $json\n";
102 my $r = decode_json $json;
104 $usage = $r->{rows}->[0]->{value};
105 $usage = 0 unless defined $usage;
108 $self->{session}->set( $data->{login} . ':usage' => $usage );
113 #md5_base64( $data->{login} . '/' . $data->{file} );
114 $data->{login} . ':' . $data->{file};
118 my ($self,$data) = @_;
119 $self->json_set( 'files', _file_key($data), $data );
123 my ($self,$data) = @_;
124 $self->json_get( 'files', _file_key($data) );
128 my ( $self,$data ) = @_;
130 if ( my $old = $self->file_get( $data ) ) {
131 $self->usage_decr( $data );
134 $self->new_file($data);
138 my ( $self,$data ) = @_;
139 $self->file_set($data);
140 $self->usage_incr($data);
144 my ( $self, $data ) = @_;
145 $self->usage_decr( $data );
146 my $k = _file_key $data;
147 $self->{files}->delete( $k );
151 my ( $self, $data ) = @_;
156 my ( $self,$data ) = @_;
158 my $blob = "users/$data->{login}/blob";
159 my $path = "$blob/$data->{file}";
161 if ( $data->{itemize} =~ m/^[c>]([fd])/ ) { # received change/create
164 if ( $type eq 'f' ) {
165 $self->modify_file( $data );
166 $self->dedup( $data, $path );
167 } elsif ( $type eq 'd' ) {
168 $self->make_dir( $data );
170 die "unknown type $type ", dump $data;
172 } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
173 $self->remove_file($data);
179 my ( $path, $md5 ) = @_;
181 my $pool = 'md5'; # FIXME sharding?
182 mkdir $pool unless -e $pool;
184 if ( -e "$pool/$md5" ) {
185 warn "dedup hit $md5 $path\n";
186 my $dedup = $path . '.dedup';
187 rename $path, $dedup;
188 link "$pool/$md5", $path;
191 link $path, "$pool/$md5";
195 my $empty_md5 = " " x 32;
198 my ( $self, $data, $path ) = @_;
200 if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
203 warn "IMPORT ", $data->{file}, "\n";
204 open(my $md5sum, '<', $path);
207 my ( $md5, $file ) = split(/\s+/,$_,2);
208 if ( ! -e "md5/$md5" ) {
209 warn "MISSING $md5 $file\n";
212 my $new = "users/$data->{login}/blob/$dir$file";
214 # create path from md5sum file
215 my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
216 make_path $only_dir unless -d $only_dir;
217 $imported += link "md5/$md5", $new;
219 login => $data->{login},
220 host => $data->{host},
221 file => $dir . $file,
225 $self->new_file($fake);
226 warn "fake ",dump($fake);
228 md5pool $new => $md5;
231 print "INFO imported $imported files from ",dump($data);
234 if ( $data->{md5} ne $empty_md5 ) {
235 md5pool $path => $data->{md5};