display and/or create md5sum in user.md5 xattr
[cloudstore.git] / rsync-piper.pl
1 #!/usr/bin/perl
2 use warnings;
3 use strict;
4
5 use autodie;
6 use POSIX;
7 use File::Slurp;
8 use IO::Select;
9 use Time::HiRes;
10 use Data::Dump qw(dump);
11 use English;
12 use Module::Refresh;
13
14 use lib 'lib';
15 use WarnColor;
16 use CloudStore::Store;
17
18 my $slice = $ARGV[0] || 's1';
19
20 my ( undef, $dir, $port, undef ) = getgrnam($slice);
21
22 my $log_fifo = "$dir/$port.log";
23 my $pid_file = "$dir/$port.pid";
24 my $cfg_file = "$dir/$port.conf";
25
26 my $rsync = 'rsync';
27 $rsync = 'bin/rsync' if -x 'bin/rsync'; # use 3.1dev version!
28
29 my @transfer = qw(
30 timestamp:%t:timestamp
31 login:%m:text
32 port:$port:int
33 auth:%u:text
34 host:%h:text
35 pid:%p:int
36 perms:%B:text
37 itemize:%i:text
38 mtime:%M:timestamp
39 md5:%C:text
40 op:%o:text
41 size:%l:int
42 transfered:%b:int
43 file:%f:text
44 );
45
46 $transfer[2] = "port:$port:int"; # expand $port
47
48 my @transfer_names =          map { ( split(/:/,$_,3) )[0] } @transfer;
49 my $transfer_log   = join('|',map { ( split(/:/,$_,3) )[1] } @transfer );
50
51 if ( $ENV{SQL} ) {
52         warn "CREATE TABLE rsync_transfer (\n\t",
53         join(",\n\t", map { my @m = split(/:/,$_,3); "$m[0] $m[2]" } @transfer),
54         "\n);\n";
55         exit 1;
56 }
57
58 my $store = CloudStore::Store->new( $slice );
59
60 unlink $log_fifo if -f $log_fifo;
61 mkfifo $log_fifo, 0700 unless -p $log_fifo;
62
63 sub rsync_rebuild_config {
64
65 my $rsync_config = qq{
66
67 #uid = nobody
68 #gid = nogroup
69 #use chroot = yes
70 use chroot = no
71
72 #max connections = 4
73 lock file = $dir/$port.lock
74
75 #syslog facility = local5
76 log file  = $log_fifo
77
78 transfer logging = yes
79 log format = transfer-log:$transfer_log
80 max verbosity = 5
81
82 pid file  = $pid_file
83
84 # don't check secrets file permission (uid)
85 strict modes = no
86
87 #pre-xfer exec = /srv/cloudstore/rsync-xfer-trigger.pl
88 #post-xfer exec = /srv/cloudstore/rsync-xfer-trigger.pl
89
90 # inplace breaks update of deduped files
91 refuse options = inplace
92
93 };
94
95 open(my $p, '<', '/var/lib/extrausers/passwd');
96 while(<$p>) {
97         chomp;
98
99         my ( $login, undef, $uid, $gid, $email, $path, $shell ) = split(/:/,$_,7);
100
101         if ( -d $path && -f "$path/.secrets" && ! -e "$path/.meta/secrets" ) {
102                 $store->api->mkbasepath( "$path/.meta/secrets" );
103                 rename "$path/.secrets", "$path/.meta/secrets";
104                 warn "UPGRADE $login rsync secrets location\n";
105         }
106
107         if ( -d $path && -f "$path/.meta/secrets" ) {
108                 my @secrets = map { chomp; $_ } read_file "$path/.meta/secrets";
109                 my $auth_users = join(', ', map { s/:.+$//; $_ } @secrets );
110
111                 $rsync_config .= <<__RSYNC_MODULE__;
112
113 [$login]
114         path = $path
115         auth users = $auth_users
116         secrets file = $path/.meta/secrets
117         read only = false
118         uid = $uid
119         gid = $gid
120         filter = - /.meta
121 #       refuse options = c delete
122 #       dont compress = *
123         incoming chmod = u=rwX,g+rX,o+rX
124
125
126 __RSYNC_MODULE__
127
128                 warn "INFO: added $login = $auth_users\n";
129
130         } else {
131                 warn "skipped $login: $!";
132         }
133
134 }
135
136 write_file $cfg_file, $rsync_config;
137 warn "created $cfg_file ", -s $cfg_file, " bytes\n";
138
139 } # sub rsync_rebuild_config
140
141 rsync_rebuild_config;
142
143 sub rsync_running_pid {
144         return unless -e $pid_file;
145         my $pid = read_file $pid_file;
146         chomp($pid);
147         return $pid;
148 }
149
150 if ( my $pid = rsync_running_pid ) {
151         if ( kill 0, $pid ) {
152                 warn "found rsync pid $pid\n";
153                 kill 1, $pid && warn "reload config";
154 =for kill-rsync
155                 kill 2, $pid;
156                 while ( -e $pid_file ) {
157                         warn "waiting for rsync to die...\n";
158                         sleep 1;
159                 }
160                 
161                 open(my $fifo, '<', $log_fifo);
162                 while ( kill 0, $pid ) {
163                         my $line = <$fifo>;
164                         warn "<<< $line\n";
165                 }
166
167                 kill 0, $pid && die "can't kill it!";
168 =cut
169         } else {
170                 unlink $pid_file;
171         }
172 }
173
174 use POSIX ":sys_wait_h";
175 sub REAPER {
176         my $child;
177         while ((my $waitedpid = waitpid(-1,WNOHANG)) > 0) {
178                 warn "reaped $waitedpid" . ($? ? " with exit $?" : '');
179         }
180         $SIG{CHLD} = \&REAPER;  # loathe SysV
181 }
182
183 $SIG{CHLD} = \&REAPER;
184
185
186 if ( ! -e $pid_file || ! kill( 0, rsync_running_pid ) ) {
187         my $exec = "$rsync --daemon --config $cfg_file --no-detach --port=$port";
188         warn "START $exec\n";
189
190         die "could not fork\n" unless defined(my $pid = fork);
191         unless ($pid) {
192                 warn "start server with $exec\n";
193                 exec $exec || die $!;
194         }
195
196         warn "wait for pid file";
197         while ( ! -e $pid_file ) {
198                 sleep 1;
199         }
200 }
201
202 =for gearman
203 use Gearman::Client;
204 my $gearman = Gearman::Client->new;
205 $gearman->job_servers('127.0.0.1:4730');
206 =cut
207
208 while(1) {
209         die "no rsync running" unless kill 0, rsync_running_pid;
210         warn "waiting for log from $log_fifo\n";
211         open(my $fifo, '<', $log_fifo);
212         while( my $line = <$fifo> ) {
213                 Module::Refresh->refresh;
214                 chomp $line;
215                 warn $line, $/;
216
217                 if ( $line =~ /\[(\d+)\] transfer-log:(.+)/ ) {
218                         my $pid = $1;
219                         my $transfer = $2;
220                         $transfer =~ s|(\d\d\d\d)/(\d\d)/(\d\d)[-\s](\d\d:\d\d:\d\d)|$1-$2-$3T$4|g;
221                         my ( $yyyy,$mm,$dd,undef,$login,undef ) = split( /[\-T\|]/, $transfer, 6 );
222                         my $host = $1 if $login =~ s/\+(.+)//;
223
224 if(0) {
225                         my $path = "users/$login/log";
226                         mkdir $path unless -d $path;
227                         $path .= "/$yyyy-$mm-$dd";
228                         my $new_log = ! -e $path;
229                         open( my $log, '>>', $path );
230                         print $log join('|',@transfer),"\n" if $new_log; # store header
231                         print $log "$transfer\n";
232                         close $log;
233 }
234
235                         my @v = split(/\|/,$transfer,$#transfer + 1);
236                         my %data;
237                         @data{@transfer_names} = @v ; # FIXME validate?
238
239                         $data{pid} = $pid;
240                         # overwrite pid from transfer log with consistant one for start/stop
241
242                         print ">>> data ",dump( \%data ) if $ENV{DEBUG};
243
244                         $store->rsync_transfer( \%data );
245 =for gearman
246                         $gearman->dispatch_background( 'rsync_transfer' => $json );
247 =cut
248
249                         die "no rsync running" unless kill 0, rsync_running_pid;
250                 } elsif ( $line =~ m/(unknown module|rebuild|reload|config)/ ) {
251                         warn "refresh modules, rebuild config and HUP rsync";
252                         Module::Refresh->refresh;
253                         rsync_rebuild_config;
254                         my $pid = rsync_running_pid;
255                         kill 1, $pid && warn "reload config";
256                 } else {
257                         $store->rsync_log( $line );
258                 }
259         }
260         close($fifo);
261         sleep 1;
262 }
263