create user/log/yyyy-mm-dd transfer logs
[cloudstore.git] / rsync-piper.pl
1 #!/usr/bin/perl
2 use warnings;
3 use strict;
4
5 use autodie;
6 use POSIX;
7 use File::Slurp;
8 use IO::Select;
9 use Time::HiRes;
10 use Data::Dump qw(dump);
11 use English;
12 use JSON::XS;
13
14 my $dir = '/srv/cloudstore/var';
15 my $log_fifo = "$dir/rsyncd.log";
16 my $pid_file = "$dir/rsyncd.pid";
17 my $cfg_file = "$dir/rsyncd.conf";
18 my $users    = "users";
19
20 my @transfer = qw(
21 timestamp:%t:timestamp
22 login:%u:text
23 pid:%p:int
24 perms:%B:text
25 itemize:%i:text
26 mtime:%M:timestamp
27 op:%o:text
28 size:%l:int
29 transfered:%b:int
30 file:%f:text
31 );
32
33 my @transfer_names =          map { ( split(/:/,$_,3) )[0] } @transfer;
34 my $transfer_log   = join(' ',map { ( split(/:/,$_,3) )[1] } @transfer );
35
36 mkdir $dir if ! -e $dir;
37
38 mkfifo $log_fifo, 0700 unless -p $log_fifo;
39
40 my $rsync_config = qq{
41
42 #uid = nobody
43 #gid = nogroup
44 #use chroot = yes
45 use chroot = no
46
47 #max connections = 4
48 lock file = $dir/rsyncd.lock
49
50 #syslog facility = local5
51 log file  = $log_fifo
52
53 transfer logging = yes
54 log format = transfer-log:$transfer_log
55 max verbosity = 5
56
57 pid file  = $pid_file
58
59 # don't check secrets file permission (uid)
60 strict modes = no
61
62 pre-xfer exec = /srv/cloudstore/pre-xfer.sh
63 post-xfer exec = /srv/cloudstore/post-xfer.sh
64
65 [dpavlin]
66         path = /srv/cloudstore/users/dpavlin/blob
67         auth users = dpavlin
68         secrets file = /srv/cloudstore/secrets/dpavlin
69         read only = false
70
71 };
72
73 write_file $cfg_file, $rsync_config;
74 warn "created $cfg_file ", -s $cfg_file, " bytes\n";
75
76 if ( -e $pid_file ) {
77         my $pid = read_file $pid_file;
78         chomp($pid);
79         if ( kill 0, $pid ) {
80                 warn "found rsync pid $pid";
81         } else {
82                 unlink $pid_file;
83         }
84 }
85
86 if ( ! -e $pid_file ) {
87         my $exec = "rsync --daemon --config $cfg_file --no-detach --port=6501";
88         warn "START $exec\n";
89
90         die "could not fork\n" unless defined(my $pid = fork);
91         unless ($pid) {
92                 warn "start server with $exec\n";
93                 exec $exec || die $!;
94         }
95
96         warn "wait for pid file";
97         while ( ! -e $pid_file ) {
98                 sleep 1;
99         }
100 }
101
102 use Gearman::Client;
103 my $gearman = Gearman::Client->new;
104 $gearman->job_servers('127.0.0.1:4730');
105
106 while(1) {
107         warn "# reading log output from $log_fifo\n";
108         open(my $fifo, '<', $log_fifo);
109         while( my $line = <$fifo> ) {
110                 chomp $line;
111                 print $line, $/;
112                 if ( $line =~ /transfer-log:(.+)/ ) {
113                         my $transfer = $1;
114                         $transfer =~ s|(\d\d\d\d)/(\d\d)/(\d\d)[-\s](\d\d:\d\d:\d\d)|$1-$2-$3T$4|g;
115 warn "XXX $transfer";
116                         my ( $yyyy,$mm,$dd,undef,$login,undef ) = split( /[\-T\s]/, $transfer, 6 );
117
118                         my $path = "users/$login/log";
119                         mkdir $path unless -d $path;
120                         $path .= "/$yyyy-$mm-$dd";
121 warn "## $path $transfer\n";
122                         my $new_log = ! -e $path;
123                         open( my $log, '>>', $path );
124                         print $log join(' ',@transfer),"\n" if $new_log; # store header
125                         print $log "$transfer\n";
126                         close $log;
127
128
129                         my @v = split(/\s+/,$transfer,$#transfer + 1);
130                         my %data;
131                         @data{@transfer_names} = @v ; # FIXME validate?
132
133                         print ">>> data ",dump( \%data );
134
135                         $gearman->dispatch_background( 'rsync_transfer' => encode_json \%data );
136
137                 }
138         }
139         close($fifo);
140         sleep 1;
141 }
142