added usage_init from view
[cloudstore.git] / lib / CloudStore / Couchbase.pm
1 package CloudStore::Couchbase;
2 use warnings;
3 use strict;
4
5 use autodie;
6 use JSON::XS;
7 use File::Path qw(make_path);
8 use File::Slurp qw();
9 use Cache::Memcached;
10 use Digest::MD5 qw(md5_base64);
11 use Data::Dump qw(dump);
12 use LWP::Simple;
13
14 my $buckets = {
15         files => 5800,
16         session => 5801,
17 };
18
19 sub new {
20         my ($class) = @_;
21
22         my $self = {};
23         bless $self, $class;
24
25         foreach my $bucket ( keys %$buckets ) {
26                 my $port = $buckets->{$bucket};
27                 my $server = new Cache::Memcached {
28                         'servers' => [ "127.0.0.1:$port" ],
29                         'debug' => $ENV{DEBUG},
30                 #       'compress_threshold' => 10_000,
31                 };
32                 #$server->set_servers($array_ref);
33                 #$server->set_compress_threshold(10_000);
34                 $server->enable_compress(0);
35                 $self->{$bucket} = $server;
36
37         }
38
39         warn "# new ",dump $self;
40
41         return $self;
42 }
43
44 sub usage_decr {
45         my ($self,$data) = @_;
46         $self->{session}->decr( $data->{login} . ':usage' => $data->{size} );
47 }
48
49 sub usage_incr {
50         my ($self,$data) = @_;
51         $self->{session}->incr( $data->{login} . ':usage' => $data->{size} );
52 }
53
54 sub usage {
55         my ($self,$data) = @_;
56         $self->{session}->get( $data->{login} . ':usage' );
57 }
58
59 sub usage_init {
60         my ($self,$data) = @_;
61
62         my $usage = 0;
63
64         my $url = sprintf
65                 'http://localhost:5984/files/_design/files/_view/login_usage?group=true&connection_timeout=60000&limit=1&skip=0&start_key="%s"&end_key="%s"'
66                 , $data->{login}
67                 , $data->{login}
68         ;
69
70         warn "usage from $url";
71         if ( my $json = get $url ) {
72                 warn "# JSON = $json\n";
73                 my $r = decode_json $json;
74                 warn dump $r;
75                 $usage = $r->{rows}->[0]->{value};
76                 $usage = 0 unless defined $usage;
77         }
78
79         $self->{session}->set( $data->{login} . ':usage' => $usage );
80 }
81
82 sub _key {
83         my $data = shift;
84         #md5_base64( $data->{login} . '/' . $data->{file} );
85         $data->{login} . ':' . $data->{file};
86 }
87
88 sub file_set {
89         my ($self,$data) = @_;
90         my $k = _key $data;
91         my $json = encode_json $data;
92         $self->{files}->set( $k => $json );
93         return $json;
94 }
95
96 sub file_get {
97         my ($self,$data) = @_;
98         my $k = _key $data;
99         if ( my $json = $self->{files}->get($k) ) {
100                 return decode_json $json;
101         }
102 }
103
104 sub modify_file {
105         my ( $self,$data ) = @_;
106
107         if ( my $old = $self->file_get( $data ) ) {
108                 $self->usage_decr( $data );
109         }
110
111         $self->new_file($data);
112 }
113
114 sub new_file {
115         my ( $self,$data ) = @_;
116         $self->file_set($data);
117         $self->usage_incr($data);
118 }
119
120 sub remove_file {
121         my ( $self, $data ) = @_;
122         $self->usage_decr( $data );
123         my $k = _key $data;
124         $self->{files}->delete( $k );
125 }
126
127 sub make_dir {
128         my ( $self, $data ) = @_;
129
130 }
131
132 sub transfer {
133         my ( $self,$data ) = @_;
134
135         my $blob = "users/$data->{login}/blob";
136         my $path = "$blob/$data->{file}";
137
138         if ( $data->{itemize} =~ m/^[c>]([fd])/ ) { # received change/create
139                 my $type = $1;
140
141                 if ( $type eq 'f' ) {
142                         $self->modify_file( $data );
143                         $self->dedup( $data, $path );
144                 } elsif ( $type eq 'd' ) {
145                         $self->make_dir( $data );
146                 } else {
147                         die "unknown type $type ", dump $data;
148                 }
149         } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
150                 $self->remove_file($data);
151         }
152         return $data;
153 }
154
155 sub md5pool {
156         my ( $path, $md5 ) = @_;
157
158         my $pool = 'md5'; # FIXME sharding?
159         mkdir $pool unless -e $pool;
160
161         if ( -e "$pool/$md5" ) {
162                 warn "dedup hit $md5 $path\n";
163                 my $dedup = $path . '.dedup';
164                 rename $path, $dedup;
165                 link "$pool/$md5", $path;
166                 unlink $dedup;
167         } else {
168                 link $path, "$pool/$md5";
169         }
170 }
171
172 my $empty_md5 = " " x 32;
173
174 sub dedup {
175         my ( $self, $data, $path ) = @_;
176
177         if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
178                 my $dir = $1;
179                 my $imported = 0;
180                 warn "IMPORT ", $data->{file}, "\n";
181                 open(my $md5sum, '<', $path);
182                 while(<$md5sum>) {
183                         chomp;
184                         my ( $md5, $file ) = split(/\s+/,$_,2);
185                         if ( ! -e "md5/$md5" ) {
186                                 warn "MISSING $md5 $file\n";
187                                 next;
188                         }
189                         my $new = "users/$data->{login}/blob/$dir$file";
190                         if ( ! -e $new ) {
191                                 # create path from md5sum file
192                                 my $only_dir = $1 if $new =~ m{^(.+)/[^/]+$};
193                                 make_path $only_dir unless -d $only_dir;
194                                 $imported += link "md5/$md5", $new;
195                                 my $fake = {
196                                         login => $data->{login},
197                                         host => $data->{host},
198                                         file => $dir . $file,
199                                         md5 => $md5,
200                                         size => -s $new,
201                                 };
202                                 $self->new_file($fake);
203                                 warn "fake ",dump($fake);
204                         } else {
205                                 md5pool $new => $md5;
206                         }
207                 }
208                 print "INFO imported $imported files from ",dump($data);
209         }
210
211         if ( $data->{md5} ne $empty_md5 ) {
212                 md5pool $path => $data->{md5};
213         } else {
214                 
215         }
216 }
217
218 1;