X-Git-Url: http://git.rot13.org/?p=cloudstore.git;a=blobdiff_plain;f=rsync-piper.pl;h=04c3a8a6b44be031cb43885563a40898e8f99d80;hp=fc4c560f16a8a5b9c1f34ca20c59aa6cafdaf7f3;hb=ddf7f5d29633547c4dc83c67fdd55f3995375a72;hpb=63715b97542b9258da2e2c2a201f0b4f97188f89 diff --git a/rsync-piper.pl b/rsync-piper.pl index fc4c560..04c3a8a 100755 --- a/rsync-piper.pl +++ b/rsync-piper.pl @@ -8,41 +8,59 @@ use File::Slurp; use IO::Select; use Time::HiRes; use Data::Dump qw(dump); +use English; +use Module::Refresh; -my $dir = '/srv/cloudstore/var'; -my $log_fifo = "$dir/rsyncd.log"; -our $pid_file = "$dir/rsyncd.pid"; -my $cfg_file = "$dir/rsyncd.conf"; -my $users = "users"; +use lib 'lib'; +use WarnColor; +use CloudStore::Store; -mkdir $dir if ! -e $dir; +my $slice = $ARGV[0] || 's1'; -unlink $log_fifo if -e $log_fifo; -mkfifo $log_fifo, 0700; +my ( undef, $dir, $port, undef ) = getgrnam($slice); -sub kill_rsync { - return unless -e $pid_file; - my $pid = read_file $pid_file; - warn "kill_rsync $pid_file = $pid"; - kill 2, $pid && warn "kill INT $pid old rsync server"; - kill 0, $pid && unlink $pid_file && warn "removed $pid_file"; +my $log_fifo = "$dir/$port.log"; +my $pid_file = "$dir/$port.pid"; +my $cfg_file = "$dir/$port.conf"; + +my $rsync = 'rsync'; +$rsync = 'bin/rsync' if -x 'bin/rsync'; # use 3.1dev version! + +my @transfer = qw( +timestamp:%t:timestamp +login:%m:text +port:$port:int +auth:%u:text +host:%h:text +pid:%p:int +perms:%B:text +itemize:%i:text +mtime:%M:timestamp +md5:%C:text +op:%o:text +size:%l:int +transfered:%b:int +file:%f:text +); + +$transfer[2] = "port:$port:int"; # expand $port + +my @transfer_names = map { ( split(/:/,$_,3) )[0] } @transfer; +my $transfer_log = join('|',map { ( split(/:/,$_,3) )[1] } @transfer ); + +if ( $ENV{SQL} ) { + print "CREATE TABLE rsync_transfer (\n\t", + join(",\n\t", map { my @m = split(/:/,$_,3); "$m[0] $m[2]" } @transfer), + "\n);\n"; + exit 1; } -my $transfer_log = { - ip => '%o', - user => '%u', - host => '%h', - perms => '%B', - file => '%f', - updated => '%i', - len => '%l', - transfered => '%b', - module => '%m', - mtime => '%M', - op => '%o', - pid => '%p', - timestamp => '%t', -}; +my $store = CloudStore::Store->new( $slice ); + +unlink $log_fifo if -f $log_fifo; +mkfifo $log_fifo, 0700 unless -p $log_fifo; + +sub rsync_rebuild_config { my $rsync_config = qq{ @@ -52,13 +70,13 @@ my $rsync_config = qq{ use chroot = no #max connections = 4 -lock file = $dir/rsyncd.lock +lock file = $dir/$port.lock #syslog facility = local5 log file = $log_fifo transfer logging = yes -log format = TRANSFER } . join('|',values %$transfer_log) . qq{ +log format = transfer-log:$transfer_log max verbosity = 5 pid file = $pid_file @@ -66,51 +84,180 @@ pid file = $pid_file # don't check secrets file permission (uid) strict modes = no -pre-xfer exec = /srv/cloudstore/pre-xfer.sh -post-xfer exec = /srv/cloudstore/post-xfer.sh +#pre-xfer exec = /srv/cloudstore/rsync-xfer-trigger.pl +#post-xfer exec = /srv/cloudstore/rsync-xfer-trigger.pl -[dpavlin] - path = /srv/cloudstore/users/dpavlin/blob - auth users = dpavlin - secrets file = /srv/cloudstore/secrets/dpavlin - read only = false +# inplace breaks update of deduped files +refuse options = inplace }; +open(my $p, '<', '/var/lib/extrausers/passwd'); +while(<$p>) { + chomp; + + my ( $login, undef, $uid, $gid, $email, $path, $shell ) = split(/:/,$_,7); + + if ( -d $path && -f "$path/.secrets" && ! -e "$path/.meta/secrets" ) { + $store->api->mkbasepath( "$path/.meta/secrets" ); + rename "$path/.secrets", "$path/.meta/secrets"; + warn "UPGRADE $login rsync secrets location\n"; + } + + if ( -d $path && -f "$path/.meta/secrets" ) { + my @secrets = map { chomp; $_ } read_file "$path/.meta/secrets"; + my $auth_users = join(', ', map { s/:.+$//; $_ } @secrets ); + + $rsync_config .= <<__RSYNC_MODULE__; + +[$login] + path = $path + auth users = $auth_users + secrets file = $path/.meta/secrets + read only = false + uid = $uid + gid = $gid + filter = - /.meta +# refuse options = c delete +# dont compress = * + incoming chmod = u=rwX,g+rX,o+rX + + +__RSYNC_MODULE__ + + print "INFO: added $login = $auth_users\n"; + + } else { + warn "skipped $login: $!"; + } + +} + write_file $cfg_file, $rsync_config; warn "created $cfg_file ", -s $cfg_file, " bytes\n"; -kill_rsync; -sub DESTROY { kill_rsync; }; +} # sub rsync_rebuild_config -my $exec = "rsync --daemon --config $cfg_file --no-detach --port=6501"; +rsync_rebuild_config; -#die "could not fork\n" unless defined(my $pid = fork); -#unless ($pid) { -# warn "start server with $exec\n"; -# exec $exec || die $!; -#} +sub rsync_running_pid { + return unless -e $pid_file; + my $pid = read_file $pid_file; + chomp($pid); + return $pid; +} -warn "START $exec\n"; -open(my $rsync_fd, '-|', $exec); +if ( my $pid = rsync_running_pid ) { + if ( kill 0, $pid ) { + warn "found rsync pid $pid\n"; + kill 1, $pid && warn "reload config"; +=for kill-rsync + kill 2, $pid; + while ( -e $pid_file ) { + warn "waiting for rsync to die...\n"; + sleep 1; + } + + open(my $fifo, '<', $log_fifo); + while ( kill 0, $pid ) { + my $line = <$fifo>; + warn "<<< $line\n"; + } -while ( ! -e $pid_file ) { - sleep 1; + kill 0, $pid && die "can't kill it!"; +=cut + } else { + unlink $pid_file; + } +} + +use POSIX ":sys_wait_h"; +sub REAPER { + my $child; + while ((my $waitedpid = waitpid(-1,WNOHANG)) > 0) { + warn "reaped $waitedpid" . ($? ? " with exit $?" : ''); + } + $SIG{CHLD} = \&REAPER; # loathe SysV +} + +$SIG{CHLD} = \&REAPER; + + +if ( ! -e $pid_file || ! kill( 0, rsync_running_pid ) ) { + my $exec = "$rsync --daemon --config $cfg_file --no-detach --port=$port"; + warn "START $exec\n"; + + die "could not fork\n" unless defined(my $pid = fork); + unless ($pid) { + warn "start server with $exec\n"; + exec $exec || die $!; + } + + warn "wait for pid file"; + while ( ! -e $pid_file ) { + sleep 1; + } } +=for gearman +use Gearman::Client; +my $gearman = Gearman::Client->new; +$gearman->job_servers('127.0.0.1:4730'); +=cut while(1) { - warn "# reading log output from $log_fifo\n"; - open(my $log, '<', $log_fifo); - while( my $line = <$log> ) { - print "LINE: $line"; - if ( $line =~ /transfer-log:(.+\|.+)/ ) { - my (%data); - %data{ keys %$transfer_log } = split(/\|/,$1); # FIXME validate? - print dump(\%data); + die "no rsync running" unless kill 0, rsync_running_pid; + warn "waiting for log from $log_fifo\n"; + open(my $fifo, '<', $log_fifo); + while( my $line = <$fifo> ) { + Module::Refresh->refresh; + chomp $line; + warn $line, $/; + + if ( $line =~ /\[(\d+)\] transfer-log:(.+)/ ) { + my $pid = $1; + my $transfer = $2; + $transfer =~ s|(\d\d\d\d)/(\d\d)/(\d\d)[-\s](\d\d:\d\d:\d\d)|$1-$2-$3T$4|g; + my ( $yyyy,$mm,$dd,undef,$login,undef ) = split( /[\-T\|]/, $transfer, 6 ); + my $host = $1 if $login =~ s/\+(.+)//; + +if(0) { + my $path = "users/$login/log"; + mkdir $path unless -d $path; + $path .= "/$yyyy-$mm-$dd"; + my $new_log = ! -e $path; + open( my $log, '>>', $path ); + print $log join('|',@transfer),"\n" if $new_log; # store header + print $log "$transfer\n"; + close $log; +} + + my @v = split(/\|/,$transfer,$#transfer + 1); + my %data; + @data{@transfer_names} = @v ; # FIXME validate? + + $data{pid} = $pid; + # overwrite pid from transfer log with consistant one for start/stop + + print ">>> data ",dump( \%data ) if $ENV{DEBUG}; + + $store->rsync_transfer( \%data ); +=for gearman + $gearman->dispatch_background( 'rsync_transfer' => $json ); +=cut + + die "no rsync running" unless kill 0, rsync_running_pid; + } elsif ( $line =~ m/(unknown module|rebuild|reload|config)/ ) { + warn "refresh modules, rebuild config and HUP rsync"; + Module::Refresh->refresh; + rsync_rebuild_config; + my $pid = rsync_running_pid; + kill 1, $pid && warn "reload config"; + } else { + $store->rsync_log( $line ); } } - close($log); + close($fifo); sleep 1; }