ddd5c8b20ef5bad66400847b31f428728ab9d00c
[cloudstore.git] / lib / CloudStore / Couchbase.pm
1 package CloudStore::Couchbase;
2 use warnings;
3 use strict;
4
5 use autodie;
6 use JSON::XS;
7 use File::Path qw(make_path);
8 use File::Slurp qw();
9 use Cache::Memcached;
10 use Digest::MD5 qw(md5_base64);
11 use Data::Dump qw(dump);
12 use LWP::Simple;
13 use Carp qw(confess);
14
15 my $buckets = {
16         users => 5800,
17         files => 5801,
18         session => 5802,
19 };
20
21 sub new {
22         my ($class) = @_;
23
24         my $self = {};
25         bless $self, $class;
26
27         foreach my $bucket ( keys %$buckets ) {
28                 my $port = $buckets->{$bucket};
29                 my $server = new Cache::Memcached {
30                         'servers' => [ "127.0.0.1:$port" ],
31                         'debug' => $ENV{DEBUG},
32                 #       'compress_threshold' => 10_000,
33                 };
34                 #$server->set_servers($array_ref);
35                 #$server->set_compress_threshold(10_000);
36                 $server->enable_compress(0);
37                 $self->{$bucket} = $server;
38
39         }
40
41         warn "# new ",dump $self if $ENV{DEBUG};
42
43         return $self;
44 }
45
46 sub json_set {
47         my ($self,$bucket,$key,$data) = @_;
48         confess "data not ref ",dump($data) unless ref $data;
49         my $json = encode_json $data;
50         $self->{$bucket}->set( $key => $json );
51         warn "## $bucket set $key $json\n";
52         return $json;
53 }
54
55 sub json_get {
56         my ($self,$bucket,$key,$data) = @_;
57         if ( my $json = $self->{$bucket}->get($key) ) {
58                 warn "## $bucket get $key $json\n";
59                 return decode_json $json;
60         }
61 }
62
63 sub user_set {
64         my ($self,$data) = @_;
65         $self->json_set( 'users', $data->{login}, $data );
66 }
67
68 sub user_get {
69         my ($self,$login) = @_;
70         $login = $login->{login} if ref $login;
71         my $user = $self->json_get( 'users', $login );
72         $user->{usage} = $self->usage( $login );
73         $user->{status} = $self->status( $login );
74         warn "## user ",dump($user);
75         return $user;
76 }
77
78 sub status {
79         my ($self,$login,$message) = @_;
80         $login = $login->{login} if ref $login;
81         if ( $message ) {
82                 $self->{session}->set( "$login:status" => $message );
83         } else {
84                 $self->{session}->get( "$login:status" );
85         }
86 }
87
88 sub usage_decr {
89         my ($self,$data) = @_;
90         $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
91 }
92
93 sub usage_incr {
94         my ($self,$data) = @_;
95         $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
96 }
97
98 sub usage {
99         my ($self,$login) = @_;
100         $login = $login->{login} if ref $login;
101         $self->{session}->get( $login . ':usage' );
102 }
103
104 sub couchdb {
105         my $self = shift @_;
106         my $fmt  = shift @_;
107         my $url = sprintf $fmt, @_;
108
109         warn "# couchdb $url\n";
110         if ( my $json = get $url ) {
111                 warn "## $url $json\n";
112                 my $r = decode_json $json;
113                 return $r;
114         }
115 }
116
117 sub usage_init {
118         my ($self,$login) = @_;
119         $login = $login->{login} if ref $login;
120
121         my $usage = 0;
122
123         if ( my $r = $self->couchdb(
124                 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after'
125                 , $login
126                 , $login
127         )) {
128
129                 $usage = $r->{rows}->[0]->{value};
130                 $usage = 0 unless defined $usage;
131         }
132
133         $self->{session}->set( $login . ':usage' => $usage );
134 }
135
136 sub _file_key {
137         my $data = shift;
138         #md5_base64( $data->{login} . '/' . $data->{file} );
139         $data->{login} . ':' . $data->{file};
140 }
141
142 sub file_set {
143         my ($self,$data) = @_;
144         $self->json_set( 'files', _file_key($data), $data );
145 }
146
147 sub file_get {
148         my ($self,$data) = @_;
149         $self->json_get( 'files', _file_key($data) );
150 }
151
152 sub modify_file {
153         my ( $self,$data ) = @_;
154
155         if ( my $old = $self->file_get( $data ) ) {
156                 $self->usage_decr( $data );
157         }
158
159         $self->new_file($data);
160 }
161
162 sub new_file {
163         my ( $self,$data ) = @_;
164         $self->file_set($data);
165         $self->usage_incr($data);
166 }
167
168 sub remove_file {
169         my ( $self, $data ) = @_;
170         $self->usage_decr( $data );
171         my $k = _file_key $data;
172         $self->{files}->delete( $k );
173 }
174
175 sub make_dir {
176         my ( $self, $data ) = @_;
177
178 }
179
180 sub new_link {
181         my ( $self, $data ) = @_;
182
183         warn "# new_link ",dump $data;
184
185         if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
186                 my ( $dir, $to, $name ) = ( $1, $2, $3 );
187                 my $path = "users/$data->{login}/blob/" . $data->{file};
188                 my $file = readlink $path;
189                 warn "## $path -> $file";
190                 if ( $file =~ s{^\Q/rsyncd-munged/../../\E}{$dir} ) {
191                         warn "SEND To:$to Name:$name File:$file\n";
192                         my $s = "users/$data->{login}/blob/$file";
193                         my $d = "users/$to/blob";
194                         die "no user $to" unless -e $d;
195                         $d .= "/$name";
196
197                         # since $name can contain directories we must create them
198                         my $to_dir = $d;
199                         $to_dir =~ s{/[^/]+$}{};
200                         make_path $to_dir if ! -e $to_dir;
201
202                         warn "link $s -> $d\n";
203                         link $s, $d;
204                 } else {
205                         warn "ERROR: can't SEND To:$to Name:$name File:$file";
206                 }
207         }
208 }
209
210 sub transfer {
211         my ( $self,$data ) = @_;
212
213         my $blob = "users/$data->{login}/blob";
214         my $path = "$blob/$data->{file}";
215
216         if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
217                 my $type = $1;
218
219                 if ( $type eq 'f' ) {
220                         $self->modify_file( $data );
221                         $self->dedup( $data, $path );
222                 } elsif ( $type eq 'd' ) {
223                         $self->make_dir( $data );
224                 } elsif ( $type eq 'L' ) {
225                         $self->new_link( $data );
226                 } else {
227                         die "unknown type $type ", dump $data;
228                 }
229         } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
230                 $self->remove_file($data);
231         }
232         return $data;
233 }
234
235 sub md5pool {
236         my ( $path, $md5 ) = @_;
237
238         my $pool = 'md5'; # FIXME sharding?
239         mkdir $pool unless -e $pool;
240
241         if ( -e "$pool/$md5" ) {
242                 warn "dedup hit $md5 $path\n";
243                 my $dedup = $path . '.dedup';
244                 rename $path, $dedup;
245                 link "$pool/$md5", $path;
246                 unlink $dedup;
247         } else {
248                 link $path, "$pool/$md5";
249         }
250 }
251
252 my $empty_md5 = " " x 32;
253
254 sub dedup {
255         my ( $self, $data, $path ) = @_;
256
257         if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
258                 my $dir = $1;
259                 my $imported = 0;
260                 warn "IMPORT ", $data->{file}, "\n";
261                 open(my $md5sum, '<', $path);
262                 while(<$md5sum>) {
263                         chomp;
264                         my ( $md5, $file ) = split(/\s+/,$_,2);
265                         if ( ! -e "md5/$md5" ) {
266                                 warn "MISSING $md5 $file\n";
267                                 next;
268                         }
269                         my $new = "users/$data->{login}/blob/$dir$file";
270                         if ( ! -e $new ) {
271                                 # create path from md5sum file
272                                 my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
273                                 make_path $only_dir unless -d $only_dir;
274                                 $imported += link "md5/$md5", $new;
275                                 my $fake = {
276                                         login => $data->{login},
277                                         host => $data->{host},
278                                         file => $dir . $file,
279                                         md5 => $md5,
280                                         size => -s $new,
281                                 };
282                                 $self->new_file($fake);
283                                 warn "fake ",dump($fake);
284                         } else {
285                                 md5pool $new => $md5;
286                         }
287                 }
288                 print "INFO imported $imported files from ",dump($data);
289         }
290
291         if ( $data->{md5} ne $empty_md5 ) {
292                 md5pool $path => $data->{md5};
293         } else {
294                 
295         }
296 }
297
298 1;