016bf83a38e0b2ea2fba4dc85a5366004475f612
[cloudstore.git] / rsync-piper.pl
1 #!/usr/bin/perl
2 use warnings;
3 use strict;
4
5 use autodie;
6 use POSIX;
7 use File::Slurp;
8 use IO::Select;
9 use Time::HiRes;
10 use Data::Dump qw(dump);
11 use English;
12 use JSON::XS;
13
14 my $dir   = $ENV{RSYNC_DIR}  || '/srv/cloudstore/var';
15 my $port  = $ENV{RSYNC_PORT} || 6501;
16 my $users = "users";
17
18 my $log_fifo = "$dir/$port.log";
19 my $pid_file = "$dir/$port.pid";
20 my $cfg_file = "$dir/$port.conf";
21
22 my $rsync = 'rsync';
23 $rsync = 'bin/rsync' if -x 'bin/rsync'; # use 3.1dev version!
24
25 my @transfer = qw(
26 timestamp:%t:timestamp
27 login:%u:text
28 port:$port:int
29 pid:%p:int
30 perms:%B:text
31 itemize:%i:text
32 mtime:%M:timestamp
33 md5:%C:text
34 op:%o:text
35 size:%l:int
36 transfered:%b:int
37 file:%f:text
38 );
39
40 $transfer[2] = "port:$port:int"; # expand $port
41
42 my @transfer_names =          map { ( split(/:/,$_,3) )[0] } @transfer;
43 my $transfer_log   = join('|',map { ( split(/:/,$_,3) )[1] } @transfer );
44
45 if ( $ENV{SQL} ) {
46         print "CREATE TABLE rsync_transfer (\n\t",
47         join(",\n\t", map { my @m = split(/:/,$_,3); "$m[0] $m[2]" } @transfer),
48         "\n);\n";
49         exit 1;
50 }
51
52 mkdir $dir if ! -e $dir;
53
54 mkfifo $log_fifo, 0700 unless -p $log_fifo;
55
56 my $rsync_config = qq{
57
58 #uid = nobody
59 #gid = nogroup
60 #use chroot = yes
61 use chroot = no
62
63 #max connections = 4
64 lock file = $dir/rsyncd.lock
65
66 #syslog facility = local5
67 log file  = $log_fifo
68
69 transfer logging = yes
70 log format = transfer-log:$transfer_log
71 max verbosity = 5
72
73 pid file  = $pid_file
74
75 # don't check secrets file permission (uid)
76 strict modes = no
77
78 pre-xfer exec = /srv/cloudstore/pre-xfer.sh
79 post-xfer exec = /srv/cloudstore/post-xfer.sh
80
81 [dpavlin]
82         path = /srv/cloudstore/users/dpavlin/blob
83         auth users = dpavlin
84         secrets file = /srv/cloudstore/secrets/dpavlin
85         read only = false
86
87 };
88
89 write_file $cfg_file, $rsync_config;
90 warn "created $cfg_file ", -s $cfg_file, " bytes\n";
91
92 if ( -e $pid_file ) {
93         my $pid = read_file $pid_file;
94         chomp($pid);
95         if ( kill 0, $pid ) {
96                 warn "found rsync pid $pid";
97         } else {
98                 unlink $pid_file;
99         }
100 }
101
102 if ( ! -e $pid_file ) {
103         my $exec = "$rsync --daemon --config $cfg_file --no-detach --port=$port";
104         warn "START $exec\n";
105
106         die "could not fork\n" unless defined(my $pid = fork);
107         unless ($pid) {
108                 warn "start server with $exec\n";
109                 exec $exec || die $!;
110         }
111
112         warn "wait for pid file";
113         while ( ! -e $pid_file ) {
114                 sleep 1;
115         }
116 }
117
118 use Gearman::Client;
119 my $gearman = Gearman::Client->new;
120 $gearman->job_servers('127.0.0.1:4730');
121
122 while(1) {
123         warn "# reading log output from $log_fifo\n";
124         open(my $fifo, '<', $log_fifo);
125         while( my $line = <$fifo> ) {
126                 chomp $line;
127                 print $line, $/;
128                 if ( $line =~ /transfer-log:(.+)/ ) {
129                         my $transfer = $1;
130                         $transfer =~ s|(\d\d\d\d)/(\d\d)/(\d\d)[-\s](\d\d:\d\d:\d\d)|$1-$2-$3T$4|g;
131 warn "XXX $transfer";
132                         my ( $yyyy,$mm,$dd,undef,$login,undef ) = split( /[\-T\|]/, $transfer, 6 );
133
134                         my $path = "users/$login/log";
135                         mkdir $path unless -d $path;
136                         $path .= "/$yyyy-$mm-$dd";
137 warn "## $path $transfer\n";
138                         my $new_log = ! -e $path;
139                         open( my $log, '>>', $path );
140                         print $log join('|',@transfer),"\n" if $new_log; # store header
141                         print $log "$transfer\n";
142                         close $log;
143
144
145                         my @v = split(/\|/,$transfer,$#transfer + 1);
146                         my %data;
147                         @data{@transfer_names} = @v ; # FIXME validate?
148
149                         print ">>> data ",dump( \%data );
150
151                         $gearman->dispatch_background( 'rsync_transfer' => encode_json \%data );
152
153                 }
154         }
155         close($fifo);
156         sleep 1;
157 }
158