2c9336703975aa3aa00cd0ac61734b93349319e8
[cloudstore.git] / rsync-piper.pl
1 #!/usr/bin/perl
2 use warnings;
3 use strict;
4
5 use autodie;
6 use POSIX;
7 use File::Slurp;
8 use IO::Select;
9 use Time::HiRes;
10 use Data::Dump qw(dump);
11 use English;
12
13 use lib 'lib';
14 use CloudStore::JSON;
15 use CloudStore::dedup;
16
17 my $dir   = $ENV{RSYNC_DIR}  || '/srv/cloudstore';
18 my $port  = $ENV{RSYNC_PORT} || 6501;
19 my $users = "users";
20
21 my $log_fifo = "$dir/var/$port.log";
22 my $pid_file = "$dir/var/$port.pid";
23 my $cfg_file = "$dir/var/$port.conf";
24
25 my $rsync = 'rsync';
26 $rsync = 'bin/rsync' if -x 'bin/rsync'; # use 3.1dev version!
27
28 my @transfer = qw(
29 timestamp:%t:timestamp
30 login:%u:text
31 port:$port:int
32 pid:%p:int
33 perms:%B:text
34 itemize:%i:text
35 mtime:%M:timestamp
36 md5:%C:text
37 op:%o:text
38 size:%l:int
39 transfered:%b:int
40 file:%f:text
41 );
42
43 $transfer[2] = "port:$port:int"; # expand $port
44
45 my @transfer_names =          map { ( split(/:/,$_,3) )[0] } @transfer;
46 my $transfer_log   = join('|',map { ( split(/:/,$_,3) )[1] } @transfer );
47
48 if ( $ENV{SQL} ) {
49         print "CREATE TABLE rsync_transfer (\n\t",
50         join(",\n\t", map { my @m = split(/:/,$_,3); "$m[0] $m[2]" } @transfer),
51         "\n);\n";
52         exit 1;
53 }
54
55 mkdir "$dir/var" if ! -e "$dir/var";
56
57 mkfifo $log_fifo, 0700 unless -p $log_fifo;
58
59 my $rsync_config = qq{
60
61 #uid = nobody
62 #gid = nogroup
63 #use chroot = yes
64 use chroot = no
65
66 #max connections = 4
67 lock file = $dir/var/$port.lock
68
69 #syslog facility = local5
70 log file  = $log_fifo
71
72 transfer logging = yes
73 log format = transfer-log:$transfer_log
74 max verbosity = 5
75
76 pid file  = $pid_file
77
78 # don't check secrets file permission (uid)
79 strict modes = no
80
81 #pre-xfer exec = /srv/cloudstore/pre-xfer.sh
82 #post-xfer exec = /srv/cloudstore/post-xfer.sh
83
84 };
85
86 foreach my $path ( glob "$users/*" ) {
87
88         my $login = $path;
89         $login =~ s{^.+/([^/]+)$}{$1}; 
90
91         if ( -d $path && -d "$path/blob" && -f "$path/secrets" ) {
92                 print "INFO: user $login added\n";
93
94                 $rsync_config .= <<__RSYNC_MODULE__;
95
96 [$login]
97         path = $dir/users/$login/blob
98         auth users = $login
99         secrets file = $dir/users/$login/secrets
100         read only = false
101
102 __RSYNC_MODULE__
103
104         } else {
105                 warn "skipped $login: $!";
106         }
107
108 }
109
110 write_file $cfg_file, $rsync_config;
111 warn "created $cfg_file ", -s $cfg_file, " bytes\n";
112
113 if ( -e $pid_file ) {
114         my $pid = read_file $pid_file;
115         chomp($pid);
116         if ( kill 0, $pid ) {
117                 warn "found rsync pid $pid";
118         } else {
119                 unlink $pid_file;
120         }
121 }
122
123 if ( ! -e $pid_file ) {
124         my $exec = "$rsync --daemon --config $cfg_file --no-detach --port=$port";
125         warn "START $exec\n";
126
127         die "could not fork\n" unless defined(my $pid = fork);
128         unless ($pid) {
129                 warn "start server with $exec\n";
130                 exec $exec || die $!;
131         }
132
133         warn "wait for pid file";
134         while ( ! -e $pid_file ) {
135                 sleep 1;
136         }
137 }
138
139 use Gearman::Client;
140 my $gearman = Gearman::Client->new;
141 $gearman->job_servers('127.0.0.1:4730');
142
143 while(1) {
144         warn "# reading log output from $log_fifo\n";
145         open(my $fifo, '<', $log_fifo);
146         while( my $line = <$fifo> ) {
147                 chomp $line;
148                 print $line, $/;
149                 if ( $line =~ /transfer-log:(.+)/ ) {
150                         my $transfer = $1;
151                         $transfer =~ s|(\d\d\d\d)/(\d\d)/(\d\d)[-\s](\d\d:\d\d:\d\d)|$1-$2-$3T$4|g;
152                         my ( $yyyy,$mm,$dd,undef,$login,undef ) = split( /[\-T\|]/, $transfer, 6 );
153
154                         my $path = "users/$login/log";
155                         mkdir $path unless -d $path;
156                         $path .= "/$yyyy-$mm-$dd";
157                         my $new_log = ! -e $path;
158                         open( my $log, '>>', $path );
159                         print $log join('|',@transfer),"\n" if $new_log; # store header
160                         print $log "$transfer\n";
161                         close $log;
162
163
164                         my @v = split(/\|/,$transfer,$#transfer + 1);
165                         my %data;
166                         @data{@transfer_names} = @v ; # FIXME validate?
167
168                         print ">>> data ",dump( \%data ) if $ENV{DEBUG};
169
170                         CloudStore::dedup::data \%data; # uses deleted json files!
171
172                         my $json = CloudStore::JSON::rsync_transfer \%data;
173
174                         $gearman->dispatch_background( 'rsync_transfer' => $json );
175
176                 }
177         }
178         close($fifo);
179         sleep 1;
180 }
181