use libnss-extrausers to provide uids for quota
[cloudstore.git] / lib / CloudStore / Couchbase.pm
1 package CloudStore::Couchbase;
2 use warnings;
3 use strict;
4
5 use autodie;
6 use JSON::XS;
7 use File::Path qw(make_path);
8 use File::Slurp qw();
9 use Cache::Memcached;
10 use Digest::MD5 qw(md5_base64);
11 use Data::Dump qw(dump);
12 use LWP::Simple;
13 use Carp qw(confess);
14
15 use WarnColor;
16
17 my $buckets = {
18         users => 5800,
19         files => 5801,
20         session => 5802,
21 };
22
23 sub new {
24         my ($class) = @_;
25
26         my $self = {};
27         bless $self, $class;
28
29         foreach my $bucket ( keys %$buckets ) {
30                 my $port = $buckets->{$bucket};
31                 my $server = new Cache::Memcached {
32                         'servers' => [ "127.0.0.1:$port" ],
33                         'debug' => defined $ENV{DEBUG} && $ENV{DEBUG} > 3,
34                 #       'compress_threshold' => 10_000,
35                 };
36                 #$server->set_servers($array_ref);
37                 #$server->set_compress_threshold(10_000);
38                 $server->enable_compress(0);
39                 $self->{$bucket} = $server;
40
41         }
42
43         warn "# new ",dump $self if $ENV{DEBUG};
44
45         return $self;
46 }
47
48 sub json_set {
49         my ($self,$bucket,$key,$data) = @_;
50         confess "data not ref ",dump($data) unless ref $data;
51         my $json = encode_json $data;
52         $self->{$bucket}->set( $key => $json );
53         warn "## json_set $bucket $key $json\n";
54         return $json;
55 }
56
57 sub json_get {
58         my ($self,$bucket,$key,$data) = @_;
59         if ( my $json = $self->{$bucket}->get($key) ) {
60                 warn "## json_get $bucket $key $json\n";
61                 return decode_json $json;
62         }
63 }
64
65 sub user_set {
66         my ($self,$data) = @_;
67         $self->json_set( 'users', $data->{login}, $data );
68 }
69
70 sub user_get {
71         my ($self,$login) = @_;
72         $login = $login->{login} if ref $login;
73         my $user = $self->json_get( 'users', $login );
74         $user->{usage} = $self->usage( $login );
75         $user->{status} = $self->status( $login );
76         warn "## user ",dump($user) if $ENV{DEBUG};
77         return $user;
78 }
79
80 sub status {
81         my ($self,$login,$message) = @_;
82         $login = $login->{login} if ref $login;
83         if ( $message ) {
84                 $self->{session}->set( "$login:status" => $message );
85                 return $message;
86         } else {
87                 $self->{session}->get( "$login:status" );
88         }
89 }
90
91 sub usage_decr {
92         my ($self,$data) = @_;
93         $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
94 }
95
96 sub usage_incr {
97         my ($self,$data) = @_;
98         $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
99 }
100
101 sub usage {
102         my ($self,$login) = @_;
103         $login = $login->{login} if ref $login;
104         $self->{session}->get( $login . ':usage' );
105 }
106
107 sub couchdb {
108         my $self = shift @_;
109         my $fmt  = shift @_;
110         my $url = sprintf $fmt, @_;
111
112         warn "# couchdb $url\n";
113         if ( my $json = get $url ) {
114                 warn "## $url $json\n";
115                 my $r = decode_json $json;
116                 return $r;
117         }
118 }
119
120 sub usage_init {
121         my ($self,$login) = @_;
122         $login = $login->{login} if ref $login;
123
124         my $usage = 0;
125
126         if ( my $r = $self->couchdb(
127                 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"&stale=update_after'
128                 , $login
129                 , $login
130         )) {
131
132                 $usage = $r->{rows}->[0]->{value};
133                 $usage = 0 unless defined $usage;
134         }
135
136         $self->{session}->set( $login . ':usage' => $usage );
137 }
138
139 sub _file_key {
140         my $data = shift;
141         #md5_base64( $data->{login} . '/' . $data->{file} );
142         $data->{login} . ':' . $data->{file};
143 }
144
145 sub file_set {
146         my ($self,$data) = @_;
147         $self->json_set( 'files', _file_key($data), $data );
148 }
149
150 sub file_get {
151         my ($self,$data) = @_;
152         $self->json_get( 'files', _file_key($data) );
153 }
154
155 sub modify_file {
156         my ( $self,$data ) = @_;
157
158         if ( my $old = $self->file_get( $data ) ) {
159                 $self->usage_decr( $data );
160         }
161
162         $self->new_file($data);
163 }
164
165 sub new_file {
166         my ( $self,$data ) = @_;
167         $self->file_set($data);
168         $self->usage_incr($data);
169 }
170
171 sub remove_file {
172         my ( $self, $data ) = @_;
173         $self->usage_decr( $data );
174         my $k = _file_key $data;
175         $self->{files}->delete( $k );
176 }
177
178 sub make_dir {
179         my ( $self, $data ) = @_;
180
181 }
182
183 sub new_link {
184         my ( $self, $data ) = @_;
185
186         warn "# new_link ",dump $data;
187
188         if ( $data->{file} =~ m{^(.*/?)\.send/([^/]+)/(.+)$} ) {
189                 my ( $dir, $to, $name ) = ( $1, $2, $3 );
190                 my $path = "users/$data->{login}/blob/" . $data->{file};
191                 my $link_to = readlink $path;
192                 warn "$link_to";
193                 if ( $link_to =~ s{^\Q/rsyncd-munged/\E}{/} ) {
194
195                         my $s = $path;
196                         $s =~ s{/[^/]+$}{}; # strip filename
197                         while ( $link_to =~ s{/../}{/} ) {
198                                 $s =~ s{/[^/]+$}{} || die "can't strip $s";
199                                 warn "## simplify $s $link_to\n";
200                         }
201                         $s .= $link_to;
202
203                         my $d = "users/$to/blob";
204                         if ( ! -e $d ) {
205                                 warn "ERROR: no to user $to in $d";
206                                 return;
207                         }
208                         $d .= "/$name";
209
210                         # $name can contain directories so we must create them
211                         my $to_dir = $d;
212                         $to_dir =~ s{/[^/]+$}{};
213                         make_path $to_dir if ! -e $to_dir;
214
215                         if ( ! -e $s ) {
216                                 warn "ERROR: can't find source $s";
217                         } else {
218
219                                 warn "link $s -> $d\n";
220                                 link $s, $d;
221
222                                 my ($l,$f) = ($1,$2) if $s =~ m{users/([^/]+)/blob/(.+)};
223
224                                 my $origin = $self->file_get({
225                                         login => $l,
226                                         file  => $f,
227                                 });
228                                 $self->new_file($origin);
229                                 warn "INFO: sent file ",dump($origin);
230                         }
231
232
233                 } else {
234                         warn "ERROR: can't SEND To:$to Name:$name Link:$link_to";
235                 }
236         }
237 }
238
239 sub transfer {
240         my ( $self,$data ) = @_;
241
242         my $blob = "users/$data->{login}/blob";
243         my $path = "$blob/$data->{file}";
244
245         if ( $data->{itemize} =~ m/^[c>]([fdL])/ ) { # received change/create
246                 my $type = $1;
247
248                 if ( $type eq 'f' ) {
249                         $self->modify_file( $data );
250                         $self->dedup( $data, $path );
251                 } elsif ( $type eq 'd' ) {
252                         $self->make_dir( $data );
253                 } elsif ( $type eq 'L' ) {
254                         $self->new_link( $data );
255                 } else {
256                         die "unknown type $type ", dump $data;
257                 }
258         } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
259                 $self->remove_file($data);
260         }
261         return $data;
262 }
263
264 sub md5pool {
265         my ( $path, $md5 ) = @_;
266
267         my $pool = 'md5'; # FIXME sharding?
268         mkdir $pool unless -e $pool;
269
270         if ( -e "$pool/$md5" ) {
271                 warn "dedup hit $md5 $path\n";
272                 my $dedup = $path . '.dedup';
273                 rename $path, $dedup;
274                 link "$pool/$md5", $path;
275                 unlink $dedup;
276         } else {
277                 link $path, "$pool/$md5";
278         }
279 }
280
281 my $empty_md5 = " " x 32;
282
283 sub dedup {
284         my ( $self, $data, $path ) = @_;
285
286         if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
287                 my $dir = $1;
288                 my $imported = 0;
289                 warn "IMPORT ", $data->{file}, "\n";
290                 open(my $md5sum, '<', $path);
291                 while(<$md5sum>) {
292                         chomp;
293                         my ( $md5, $file ) = split(/\s+/,$_,2);
294                         if ( ! -e "md5/$md5" ) {
295                                 warn "MISSING $md5 $file\n";
296                                 next;
297                         }
298                         my $new = "users/$data->{login}/blob/$dir$file";
299                         if ( ! -e $new ) {
300                                 # create path from md5sum file
301                                 my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
302                                 make_path $only_dir unless -d $only_dir;
303                                 $imported += link "md5/$md5", $new;
304                                 my $fake = {
305                                         login => $data->{login},
306                                         host => $data->{host},
307                                         file => $dir . $file,
308                                         md5 => $md5,
309                                         size => -s $new,
310                                 };
311                                 $self->new_file($fake);
312                                 warn "import from $path ",dump($fake);
313                         } else {
314                                 md5pool $new => $md5;
315                         }
316                 }
317                 print "INFO imported $imported files from ",dump($data);
318         }
319
320         if ( $data->{md5} ne $empty_md5 ) {
321                 md5pool $path => $data->{md5};
322         } else {
323                 
324         }
325 }
326
327 1;