d5511d3978bbc23f3d88ad188c0a1d6073c3ad12
[cloudstore.git] / rsync-piper.pl
1 #!/usr/bin/perl
2 use warnings;
3 use strict;
4
5 use autodie;
6 use POSIX;
7 use File::Slurp;
8 use IO::Select;
9 use Time::HiRes;
10 use Data::Dump qw(dump);
11 use English;
12 use Module::Refresh;
13
14 use lib 'lib';
15 use CloudStore::Couchbase;
16
17 my $dir   = $ENV{RSYNC_DIR}  || '/srv/cloudstore';
18 my $port  = $ENV{RSYNC_PORT} || 6501;
19 my $users = "users";
20 my $default_quota = $ENV{QUOTA} || 200 * 1024; # 200 Kb for test.sh
21
22 my $log_fifo = "$dir/var/$port.log";
23 my $pid_file = "$dir/var/$port.pid";
24 my $cfg_file = "$dir/var/$port.conf";
25
26 my $rsync = 'rsync';
27 $rsync = 'bin/rsync' if -x 'bin/rsync'; # use 3.1dev version!
28
29 my @transfer = qw(
30 timestamp:%t:timestamp
31 login:%m:text
32 port:$port:int
33 auth:%u:text
34 host:%h:text
35 pid:%p:int
36 perms:%B:text
37 itemize:%i:text
38 mtime:%M:timestamp
39 md5:%C:text
40 op:%o:text
41 size:%l:int
42 transfered:%b:int
43 file:%f:text
44 );
45
46 $transfer[2] = "port:$port:int"; # expand $port
47
48 my @transfer_names =          map { ( split(/:/,$_,3) )[0] } @transfer;
49 my $transfer_log   = join('|',map { ( split(/:/,$_,3) )[1] } @transfer );
50
51 if ( $ENV{SQL} ) {
52         print "CREATE TABLE rsync_transfer (\n\t",
53         join(",\n\t", map { my @m = split(/:/,$_,3); "$m[0] $m[2]" } @transfer),
54         "\n);\n";
55         exit 1;
56 }
57
58 my $store = CloudStore::Couchbase->new;
59
60 mkdir "$dir/var" if ! -e "$dir/var";
61
62 unlink $log_fifo if -f $log_fifo;
63 mkfifo $log_fifo, 0700 unless -p $log_fifo;
64
65 my $rsync_config = qq{
66
67 #uid = nobody
68 #gid = nogroup
69 #use chroot = yes
70 use chroot = no
71
72 #max connections = 4
73 lock file = $dir/var/$port.lock
74
75 #syslog facility = local5
76 log file  = $log_fifo
77
78 transfer logging = yes
79 log format = transfer-log:$transfer_log
80 max verbosity = 5
81
82 pid file  = $pid_file
83
84 # don't check secrets file permission (uid)
85 strict modes = no
86
87 pre-xfer exec = /srv/cloudstore/rsync-xfer-trigger.pl
88 post-xfer exec = /srv/cloudstore/rsync-xfer-trigger.pl
89
90 };
91
92 foreach my $path ( glob "$users/*" ) {
93
94         my $login = $path;
95         $login =~ s{^.+/([^/]+)$}{$1}; 
96
97         if ( -d $path && -d "$path/blob" && -f "$path/secrets" ) {
98                 my @secrets = map { chomp; $_ } read_file "$path/secrets";
99                 my $auth_users = join(', ', map { s/:.+$//; $_ } @secrets );
100
101                 $rsync_config .= <<__RSYNC_MODULE__;
102
103 [$login]
104         path = $dir/users/$login/blob
105         auth users = $auth_users
106         secrets file = $dir/users/$login/secrets
107         read only = false
108
109 __RSYNC_MODULE__
110
111                 print "INFO: added $login = $auth_users\n";
112
113                 my $quota = $default_quota;
114                 $quota = read_file("$path/quota") * 1 if -e "$path/quota";
115
116                 $store->user_set({
117                         login => $login,
118                         path => $path,
119                         secrets => [ @secrets ],
120                         quota => $quota,
121                         port => $port,
122                 });
123
124         } else {
125                 warn "skipped $login: $!";
126         }
127
128 }
129
130 write_file $cfg_file, $rsync_config;
131 warn "created $cfg_file ", -s $cfg_file, " bytes\n";
132
133 sub rsync_running_pid {
134         return unless -e $pid_file;
135         my $pid = read_file $pid_file;
136         chomp($pid);
137         return $pid;
138 }
139
140 if ( my $pid = rsync_running_pid ) {
141         if ( kill 0, $pid ) {
142                 warn "found rsync pid $pid\n";
143                 kill 2, $pid;
144                 while ( -e $pid_file ) {
145                         warn "waiting for rsync to die...\n";
146                         sleep 1;
147                 }
148                 
149                 open(my $fifo, '<', $log_fifo);
150                 while ( kill 0, $pid ) {
151                         my $line = <$fifo>;
152                         warn "<<< $line\n";
153                 }
154
155                 kill 0, $pid && die "can't kill it!";
156         } else {
157                 unlink $pid_file;
158         }
159 }
160
161 use POSIX ":sys_wait_h";
162 sub REAPER {
163         my $child;
164         while ((my $waitedpid = waitpid(-1,WNOHANG)) > 0) {
165                 warn "reaped $waitedpid" . ($? ? " with exit $?" : '');
166         }
167         $SIG{CHLD} = \&REAPER;  # loathe SysV
168 }
169
170 $SIG{CHLD} = \&REAPER;
171
172
173 if ( ! -e $pid_file ) {
174         my $exec = "$rsync --daemon --config $cfg_file --no-detach --port=$port";
175         warn "START $exec\n";
176
177         die "could not fork\n" unless defined(my $pid = fork);
178         unless ($pid) {
179                 warn "start server with $exec\n";
180                 exec $exec || die $!;
181         }
182
183         warn "wait for pid file";
184         while ( ! -e $pid_file ) {
185                 sleep 1;
186         }
187 }
188
189 =for gearman
190 use Gearman::Client;
191 my $gearman = Gearman::Client->new;
192 $gearman->job_servers('127.0.0.1:4730');
193 =cut
194
195 while(1) {
196         die "no rsync running" unless kill 0, rsync_running_pid;
197         warn "# reading log output from $log_fifo\n";
198         open(my $fifo, '<', $log_fifo);
199         while( my $line = <$fifo> ) {
200                 Module::Refresh->refresh;
201                 chomp $line;
202                 print $line, $/;
203
204                 if ( $line =~ /transfer-log:(.+)/ ) {
205                         my $transfer = $1;
206                         $transfer =~ s|(\d\d\d\d)/(\d\d)/(\d\d)[-\s](\d\d:\d\d:\d\d)|$1-$2-$3T$4|g;
207                         my ( $yyyy,$mm,$dd,undef,$login,undef ) = split( /[\-T\|]/, $transfer, 6 );
208                         my $host = $1 if $login =~ s/\+(.+)//;
209
210                         my $path = "users/$login/log";
211                         mkdir $path unless -d $path;
212                         $path .= "/$yyyy-$mm-$dd";
213                         my $new_log = ! -e $path;
214                         open( my $log, '>>', $path );
215                         print $log join('|',@transfer),"\n" if $new_log; # store header
216                         print $log "$transfer\n";
217                         close $log;
218
219                         my @v = split(/\|/,$transfer,$#transfer + 1);
220                         my %data;
221                         @data{@transfer_names} = @v ; # FIXME validate?
222
223                         print ">>> data ",dump( \%data ) if $ENV{DEBUG};
224
225                         $store->transfer( \%data );
226 =for gearman
227                         $gearman->dispatch_background( 'rsync_transfer' => $json );
228 =cut
229
230                         die "no rsync running" unless kill 0, rsync_running_pid;
231                 }
232         }
233         close($fifo);
234         sleep 1;
235 }
236