f46d22bf09563451a9ebed5056da71dbc28638a3
[cloudstore.git] / rsync-piper.pl
1 #!/usr/bin/perl
2 use warnings;
3 use strict;
4
5 use autodie;
6 use POSIX;
7 use File::Slurp;
8 use IO::Select;
9 use Time::HiRes;
10 use Data::Dump qw(dump);
11 use English;
12
13 use lib 'lib';
14 use CloudStore::Couchbase;
15
16 my $dir   = $ENV{RSYNC_DIR}  || '/srv/cloudstore';
17 my $port  = $ENV{RSYNC_PORT} || 6501;
18 my $users = "users";
19
20 my $log_fifo = "$dir/var/$port.log";
21 my $pid_file = "$dir/var/$port.pid";
22 my $cfg_file = "$dir/var/$port.conf";
23
24 my $rsync = 'rsync';
25 $rsync = 'bin/rsync' if -x 'bin/rsync'; # use 3.1dev version!
26
27 my @transfer = qw(
28 timestamp:%t:timestamp
29 login:%m:text
30 port:$port:int
31 auth:%u:text
32 host:%h:text
33 pid:%p:int
34 perms:%B:text
35 itemize:%i:text
36 mtime:%M:timestamp
37 md5:%C:text
38 op:%o:text
39 size:%l:int
40 transfered:%b:int
41 file:%f:text
42 );
43
44 $transfer[2] = "port:$port:int"; # expand $port
45
46 my @transfer_names =          map { ( split(/:/,$_,3) )[0] } @transfer;
47 my $transfer_log   = join('|',map { ( split(/:/,$_,3) )[1] } @transfer );
48
49 if ( $ENV{SQL} ) {
50         print "CREATE TABLE rsync_transfer (\n\t",
51         join(",\n\t", map { my @m = split(/:/,$_,3); "$m[0] $m[2]" } @transfer),
52         "\n);\n";
53         exit 1;
54 }
55
56 mkdir "$dir/var" if ! -e "$dir/var";
57
58 unlink $log_fifo if -f $log_fifo;
59 mkfifo $log_fifo, 0700 unless -p $log_fifo;
60
61 my $rsync_config = qq{
62
63 #uid = nobody
64 #gid = nogroup
65 #use chroot = yes
66 use chroot = no
67
68 #max connections = 4
69 lock file = $dir/var/$port.lock
70
71 #syslog facility = local5
72 log file  = $log_fifo
73
74 transfer logging = yes
75 log format = transfer-log:$transfer_log
76 max verbosity = 5
77
78 pid file  = $pid_file
79
80 # don't check secrets file permission (uid)
81 strict modes = no
82
83 #pre-xfer exec = /srv/cloudstore/pre-xfer.sh
84 #post-xfer exec = /srv/cloudstore/post-xfer.sh
85
86 };
87
88 foreach my $path ( glob "$users/*" ) {
89
90         my $login = $path;
91         $login =~ s{^.+/([^/]+)$}{$1}; 
92
93         if ( -d $path && -d "$path/blob" && -f "$path/secrets" ) {
94                 my @secrets = map { chomp; $_ } read_file "$path/secrets";
95                 my $auth_users = join(', ', map { s/:.+$//; $_ } @secrets );
96
97                 $rsync_config .= <<__RSYNC_MODULE__;
98
99 [$login]
100         path = $dir/users/$login/blob
101         auth users = $auth_users
102         secrets file = $dir/users/$login/secrets
103         read only = false
104
105 __RSYNC_MODULE__
106
107                 print "INFO: added $login = $auth_users\n";
108
109         } else {
110                 warn "skipped $login: $!";
111         }
112
113 }
114
115 write_file $cfg_file, $rsync_config;
116 warn "created $cfg_file ", -s $cfg_file, " bytes\n";
117
118 if ( -e $pid_file ) {
119         my $pid = read_file $pid_file;
120         chomp($pid);
121         if ( kill 0, $pid ) {
122                 warn "found rsync pid $pid\n";
123                 kill 2, $pid;
124                 while ( -e $pid_file ) {
125                         warn "waiting for rsync to die...\n";
126                         sleep 1;
127                 }
128                 
129                 open(my $fifo, '<', $log_fifo);
130                 while ( kill 0, $pid ) {
131                         my $line = <$fifo>;
132                         warn "<<< $line\n";
133                 }
134
135                 kill 0, $pid && die "can't kill it!";
136         } else {
137                 unlink $pid_file;
138         }
139 }
140
141 use POSIX ":sys_wait_h";
142 sub REAPER {
143         my $child;
144         while ((my $waitedpid = waitpid(-1,WNOHANG)) > 0) {
145                 warn "reaped $waitedpid" . ($? ? " with exit $?" : '');
146         }
147         $SIG{CHLD} = \&REAPER;  # loathe SysV
148 }
149
150 $SIG{CHLD} = \&REAPER;
151
152
153 if ( ! -e $pid_file ) {
154         my $exec = "$rsync --daemon --config $cfg_file --no-detach --port=$port";
155         warn "START $exec\n";
156
157         die "could not fork\n" unless defined(my $pid = fork);
158         unless ($pid) {
159                 warn "start server with $exec\n";
160                 exec $exec || die $!;
161         }
162
163         warn "wait for pid file";
164         while ( ! -e $pid_file ) {
165                 sleep 1;
166         }
167 }
168
169 =for gearman
170 use Gearman::Client;
171 my $gearman = Gearman::Client->new;
172 $gearman->job_servers('127.0.0.1:4730');
173 =cut
174
175 while(1) {
176         warn "# reading log output from $log_fifo\n";
177         open(my $fifo, '<', $log_fifo);
178         my $rsync = CloudStore::Couchbase->new;
179         while( my $line = <$fifo> ) {
180                 die $line if $line =~ /rsync error:/;
181                 chomp $line;
182                 print $line, $/;
183
184                 if ( $line =~ /transfer-log:(.+)/ ) {
185                         my $transfer = $1;
186                         $transfer =~ s|(\d\d\d\d)/(\d\d)/(\d\d)[-\s](\d\d:\d\d:\d\d)|$1-$2-$3T$4|g;
187                         my ( $yyyy,$mm,$dd,undef,$login,undef ) = split( /[\-T\|]/, $transfer, 6 );
188                         my $host = $1 if $login =~ s/\+(.+)//;
189
190                         my $path = "users/$login/log";
191                         mkdir $path unless -d $path;
192                         $path .= "/$yyyy-$mm-$dd";
193                         my $new_log = ! -e $path;
194                         open( my $log, '>>', $path );
195                         print $log join('|',@transfer),"\n" if $new_log; # store header
196                         print $log "$transfer\n";
197                         close $log;
198
199                         my @v = split(/\|/,$transfer,$#transfer + 1);
200                         my %data;
201                         @data{@transfer_names} = @v ; # FIXME validate?
202
203                         print ">>> data ",dump( \%data ) if $ENV{DEBUG};
204
205                         $rsync->transfer( \%data );
206 =for gearman
207                         $gearman->dispatch_background( 'rsync_transfer' => $json );
208 =cut
209
210                 }
211         }
212         close($fifo);
213         sleep 1;
214 }
215