experimental Couchbase storage API
authorDobrica Pavlinusic <dpavlin@rot13.org>
Sat, 3 Sep 2011 15:14:19 +0000 (15:14 +0000)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Sat, 3 Sep 2011 15:28:06 +0000 (15:28 +0000)
lib/CloudStore/memcache.pm
lib/CloudStore/rsync.pm [new file with mode: 0644]
rsync-piper.pl
t/rsync.t [new file with mode: 0755]

index bd4be08..e90a865 100644 (file)
@@ -6,7 +6,8 @@ use JSON::XS;
 sub new {
        my ($class) = @_;
        my $server = new Cache::Memcached {
-               'servers' => [ "127.0.0.1:11211" ],
+#              'servers' => [ "127.0.0.1:11211" ],
+               'servers' => [ "127.0.0.1:11222" ],
                'debug' => 1,
        #       'compress_threshold' => 10_000,
        };
diff --git a/lib/CloudStore/rsync.pm b/lib/CloudStore/rsync.pm
new file mode 100644 (file)
index 0000000..eed3733
--- /dev/null
@@ -0,0 +1,194 @@
+package CloudStore::rsync;
+use warnings;
+use strict;
+
+use autodie;
+use JSON::XS;
+use File::Path qw();
+use File::Slurp qw();
+use Cache::Memcached;
+use Digest::MD5 qw(md5_base64);
+use Data::Dump qw(dump);
+
+my $buckets = {
+       files => 5800,
+       usage => 5801,
+};
+
+sub new {
+       my ($class) = @_;
+
+       my $self = {};
+       bless $self, $class;
+
+       foreach my $bucket ( keys %$buckets ) {
+               my $port = $buckets->{$bucket};
+               my $server = new Cache::Memcached {
+                       'servers' => [ "127.0.0.1:$port" ],
+                       'debug' => 1,
+               #       'compress_threshold' => 10_000,
+               };
+               #$server->set_servers($array_ref);
+               #$server->set_compress_threshold(10_000);
+               $server->enable_compress(0);
+               $self->{$bucket} = $server;
+
+       }
+
+       warn "# new ",dump $self;
+
+       return $self;
+}
+
+sub usage_decr {
+       my ($self,$data) = @_;
+       $self->{usage}->decr( $data->{login} => $data->{size} );
+}
+
+sub usage_incr {
+       my ($self,$data) = @_;
+       $self->{usage}->incr( $data->{login} => $data->{size} ) ||
+       $self->{usage}->set( $data->{login} => $data->{size} );
+}
+
+sub usage {
+       my ($self,$data) = @_;
+       $self->{usage}->get( $data->{login} );
+}
+
+sub file_set {
+       my ($self,$data) = @_;
+       my $k = md5_base64( $data->{login} . '/' . $data->{file} );
+       my $json = encode_json $data;
+       $self->{files}->set( $k => $json );
+       return $json;
+}
+
+sub file_get {
+       my ($self,$data) = @_;
+       my $k = md5_base64( $data->{login} . '/' . $data->{file} );
+       if ( my $json = $self->{files}->get($k) ) {
+               return decode_json $json;
+       }
+}
+
+sub modify_file {
+       my ( $self,$data ) = @_;
+
+       if ( my $old = $self->file_get( $data ) ) {
+               $self->usage_decr( $data );
+       }
+
+       $self->new_file($data);
+}
+
+sub new_file {
+       my ( $self,$data ) = @_;
+       $self->file_set($data);
+       $self->usage_incr($data);
+}
+
+sub remove_file {
+       my ( $self, $data ) = @_;
+       $self->usage_decr( $data );
+       my $k = md5_base64( $data->{login} . '/' . $data->{file} );
+       $self->{files}->delete( $k );
+}
+
+sub make_dir {
+       my ( $self, $data ) = @_;
+
+}
+
+sub remove_dir {
+       my ( $self, $data ) = @_;
+
+}
+
+sub transfer {
+       my ( $self,$data ) = @_;
+
+       $data->{base64_path} ||= md5_base64( $data->{login} . '/' . $data->{file} );
+
+       my $blob = "users/$data->{login}/blob";
+        my $path = "$blob/$data->{file}";
+
+       if ( $data->{itemize} =~ m/^[c>]([fd])/ ) { # received change/create
+               my $type = $1;
+
+               if ( $type eq 'f' ) {
+                       $self->modify_file( $data );
+                       $self->dedup( $data, $path );
+               } elsif ( $type eq 'd' ) {
+                       $self->make_dir( $data );
+               } else {
+                       die "unknown type $type ", dump $data;
+               }
+       } elsif ( $data->{itemize} =~ m/\*deleting/ ) {
+               if ( -d $path ) {
+                       $self->remove_dir($data);
+               } elsif ( -f $path ) {
+                       $self->remove_file($data);
+               } else {
+                       die "unknown delete ", dump $data;
+               }
+       }
+       return $data;
+}
+
+sub md5pool {
+       my ( $path, $md5 ) = @_;
+
+       my $pool = 'md5'; # FIXME sharding?
+       mkdir $pool unless -e $pool;
+
+       if ( -e "$pool/$md5" ) {
+               warn "dedup hit $md5 $path\n";
+               my $dedup = $path . '.dedup';
+               rename $path, $dedup;
+               link "$pool/$md5", $path;
+               unlink $dedup;
+       } else {
+               link $path, "$pool/$md5";
+       }
+}
+
+my $empty_md5 = " " x 32;
+
+sub dedup {
+       my ( $self, $data, $path ) = @_;
+
+       if ( $data->{file} =~ /^(.+\/)?md5sum$/ ) {
+               my $dir = $1;
+               my $imported = 0;
+               warn "IMPORT ", $data->{file}, "\n";
+               open(my $md5sum, '<', $path);
+               while(<$md5sum>) {
+                       chomp;
+                       my ( $md5, $file ) = split(/\s+/,$_,2);
+                       if ( ! -e "md5/$md5" ) {
+                               warn "MISSING $md5 $file\n";
+                               next;
+                       }
+                       my $new = "users/$data->{login}/blob/$dir$file";
+                       if ( ! -e $new ) {
+                               # create path from md5sum file
+                               my $dir = $1 if $new =~ m{^(.+)/[^/]+$};
+                               make_path $dir unless -d $dir;
+                               $imported += link "md5/$md5", $new;
+                               $self->new_file( $data );
+                       } else {
+                               md5pool $new => $md5;
+                       }
+               }
+               print "INFO imported $imported files from ",dump($data);
+       }
+
+       if ( $data->{md5} ne $empty_md5 ) {
+               md5pool $path => $data->{md5};
+       } else {
+               
+       }
+}
+
+1;
index 767ab92..a02ae15 100755 (executable)
@@ -11,8 +11,7 @@ use Data::Dump qw(dump);
 use English;
 
 use lib 'lib';
-use CloudStore::JSON;
-use CloudStore::dedup;
+use CloudStore::rsync;
 
 my $dir   = $ENV{RSYNC_DIR}  || '/srv/cloudstore';
 my $port  = $ENV{RSYNC_PORT} || 6501;
@@ -176,6 +175,7 @@ $gearman->job_servers('127.0.0.1:4730');
 while(1) {
        warn "# reading log output from $log_fifo\n";
        open(my $fifo, '<', $log_fifo);
+       my $rsync = CloudStore::rsync->new;
        while( my $line = <$fifo> ) {
                die $line if $line =~ /rsync error:/;
                chomp $line;
@@ -202,10 +202,7 @@ while(1) {
 
                        print ">>> data ",dump( \%data ) if $ENV{DEBUG};
 
-                       CloudStore::dedup::data \%data; # uses deleted json files!
-
-                       my $json = CloudStore::JSON::rsync_transfer \%data;
-
+                       $rsync->transfer( \%data );
 =for gearman
                        $gearman->dispatch_background( 'rsync_transfer' => $json );
 =cut
diff --git a/t/rsync.t b/t/rsync.t
new file mode 100755 (executable)
index 0000000..01983a6
--- /dev/null
+++ b/t/rsync.t
@@ -0,0 +1,50 @@
+#!/usr/bin/perl
+use strict;
+use warnings;
+
+use Test::More tests => 6;
+use Data::Dump qw(dump);
+
+use lib 'lib';
+
+use_ok 'CloudStore::rsync';
+
+my $data =
+{
+  file       => "dir-test/bar",
+  itemize    => ">f+++++++++",
+  login      => "test",
+  md5        => "51ce99ec40129bfe1fd11d65b346d15e",
+  mtime      => "2011-07-17T17:55:15",
+  op         => "recv",
+  perms      => "rw-r--r--",
+  pid        => 29525,
+  port       => 6501,
+  size       => 10024,
+  timestamp  => "2011-07-17T17:55:15",
+  transfered => 10064,
+};
+
+ok my $r = CloudStore::rsync->new, 'new';
+
+my $json = '{"foo":42}';
+ok($r->{files}->set( 'test', $json ), 'files set');
+cmp_ok($r->{files}->get( 'test'), 'eq', $json, 'files get');
+
+ok my $j = $r->file_set($data), 'file_set';
+
+ok( my $f = $r->file_get($data), 'file_get' );
+diag dump $f;
+
+ok( my $u = $r->usage_incr($data), 'usage_incr' );
+diag dump $u;
+
+ok( my $u2 = $r->usage($data), 'usage' );
+cmp_ok $u2, '==', $u, 'usage correct';
+
+ok( my $u = $r->usage_decr($data), 'usage_incr' );
+cmp_ok $r->usage($data), '==', 0, 'empty';
+
+ok my $d = $r->transfer($data), 'transfer';
+diag dump($d);
+