1 package CloudStore::Couchbase;
7 use File::Path qw(make_path);
10 use Digest::MD5 qw(md5_base64);
11 use Data::Dump qw(dump);
25 foreach my $bucket ( keys %$buckets ) {
26 my $port = $buckets->{$bucket};
27 my $server = new Cache::Memcached {
28 'servers' => [ "127.0.0.1:$port" ],
29 'debug' => $ENV{DEBUG},
30 # 'compress_threshold' => 10_000,
32 #$server->set_servers($array_ref);
33 #$server->set_compress_threshold(10_000);
34 $server->enable_compress(0);
35 $self->{$bucket} = $server;
39 warn "# new ",dump $self;
45 my ($self,$data) = @_;
46 $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
50 my ($self,$data) = @_;
51 $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
55 my ($self,$data) = @_;
56 $self->{session}->get( $data->{login} . ':usage' );
60 my ($self,$data) = @_;
65 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"'
70 warn "usage from $url";
71 if ( my $json = get $url ) {
72 warn "# JSON = $json\n";
73 my $r = decode_json $json;
75 $usage = $r->{rows}->[0]->{value};
76 $usage = 0 unless defined $usage;
79 $self->{session}->set( $data->{login} . ':usage' => $usage );
84 #md5_base64( $data->{login} . '/' . $data->{file} );
85 $data->{login} . ':' . $data->{file};
89 my ($self,$data) = @_;
91 my $json = encode_json $data;
92 $self->{files}->set( $k => $json );
97 my ($self,$data) = @_;
99 if ( my $json = $self->{files}->get($k) ) {
100 return decode_json $json;
105 my ( $self,$data ) = @_;
107 if ( my $old = $self->file_get( $data ) ) {
108 $self->usage_decr( $data );
111 $self->new_file($data);
115 my ( $self,$data ) = @_;
116 $self->file_set($data);
117 $self->usage_incr($data);
121 my ( $self, $data ) = @_;
122 $self->usage_decr( $data );
124 $self->{files}->delete( $k );
128 my ( $self, $data ) = @_;
133 my ( $self,$data ) = @_;
135 my $blob = "users/$data->{login}/blob";
136 my $path = "$blob/$data->{file}";
138 if ( $data->{itemize} =~ m/^[c>]([fd])/ ) { # received change/create
141 if ( $type eq 'f' ) {
142 $self->modify_file( $data );
143 $self->dedup( $data, $path );
144 } elsif ( $type eq 'd' ) {
145 $self->make_dir( $data );
147 die "unknown type $type ", dump $data;
149 } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
150 $self->remove_file($data);
156 my ( $path, $md5 ) = @_;
158 my $pool = 'md5'; # FIXME sharding?
159 mkdir $pool unless -e $pool;
161 if ( -e "$pool/$md5" ) {
162 warn "dedup hit $md5 $path\n";
163 my $dedup = $path . '.dedup';
164 rename $path, $dedup;
165 link "$pool/$md5", $path;
168 link $path, "$pool/$md5";
172 my $empty_md5 = " " x 32;
175 my ( $self, $data, $path ) = @_;
177 if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
180 warn "IMPORT ", $data->{file}, "\n";
181 open(my $md5sum, '<', $path);
184 my ( $md5, $file ) = split(/\s+/,$_,2);
185 if ( ! -e "md5/$md5" ) {
186 warn "MISSING $md5 $file\n";
189 my $new = "users/$data->{login}/blob/$dir$file";
191 # create path from md5sum file
192 my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
193 make_path $only_dir unless -d $only_dir;
194 $imported += link "md5/$md5", $new;
196 login => $data->{login},
197 host => $data->{host},
198 file => $dir . $file,
202 $self->new_file($fake);
203 warn "fake ",dump($fake);
205 md5pool $new => $md5;
208 print "INFO imported $imported files from ",dump($data);
211 if ( $data->{md5} ne $empty_md5 ) {
212 md5pool $path => $data->{md5};