20f27d48e2354c787b1dc0ab7374195a9332e5c2
[cloudstore.git] / rsync-piper.pl
1 #!/usr/bin/perl
2 use warnings;
3 use strict;
4
5 use autodie;
6 use POSIX;
7 use File::Slurp;
8 use IO::Select;
9 use Time::HiRes;
10 use Data::Dump qw(dump);
11 use English;
12 use JSON::XS;
13
14 my $dir = '/srv/cloudstore/var';
15 my $log_fifo = "$dir/rsyncd.log";
16 my $pid_file = "$dir/rsyncd.pid";
17 my $cfg_file   = "$dir/rsyncd.conf";
18 my $users    = "users";
19
20 mkdir $dir if ! -e $dir;
21
22 mkfifo $log_fifo, 0700 unless -p $log_fifo;
23
24 my $transfer_log = {
25         ip => '%a',
26         login => '%u',
27         host => '%h',
28         perms => '%B',
29         file => '%f',
30         updated => '%i',
31         len => '%l',
32         transfered => '%b',
33 #       module => '%m',
34         mtime => '%M',
35         op => '%o',
36         pid => '%p',
37         timestamp => '%t',
38 };
39
40 my $rsync_config = qq{
41
42 #uid = nobody
43 #gid = nogroup
44 #use chroot = yes
45 use chroot = no
46
47 #max connections = 4
48 lock file = $dir/rsyncd.lock
49
50 #syslog facility = local5
51 log file  = $log_fifo
52
53 transfer logging = yes
54 log format = transfer-log:} . join('|',values %$transfer_log) . qq{
55 max verbosity = 5
56
57 pid file  = $pid_file
58
59 # don't check secrets file permission (uid)
60 strict modes = no
61
62 pre-xfer exec = /srv/cloudstore/pre-xfer.sh
63 post-xfer exec = /srv/cloudstore/post-xfer.sh
64
65 [dpavlin]
66         path = /srv/cloudstore/users/dpavlin/blob
67         auth users = dpavlin
68         secrets file = /srv/cloudstore/secrets/dpavlin
69         read only = false
70
71 };
72
73 write_file $cfg_file, $rsync_config;
74 warn "created $cfg_file ", -s $cfg_file, " bytes\n";
75
76 if ( -e $pid_file ) {
77         my $pid = read_file $pid_file;
78         chomp($pid);
79         if ( kill 0, $pid ) {
80                 warn "found rsync pid $pid";
81         } else {
82                 unlink $pid_file;
83         }
84 }
85
86 if ( ! -e $pid_file ) {
87         my $exec = "rsync --daemon --config $cfg_file --no-detach --port=6501";
88         warn "START $exec\n";
89
90         die "could not fork\n" unless defined(my $pid = fork);
91         unless ($pid) {
92                 warn "start server with $exec\n";
93                 exec $exec || die $!;
94         }
95
96         warn "wait for pid file";
97         while ( ! -e $pid_file ) {
98                 sleep 1;
99         }
100 }
101
102
103 use Gearman::Client;
104 my $client = Gearman::Client->new;
105 $client->job_servers('127.0.0.1:4730');
106
107 while(1) {
108         warn "# reading log output from $log_fifo\n";
109         open(my $log, '<', $log_fifo);
110         while( my $line = <$log> ) {
111                 print "LINE: $line";
112                 if ( $line =~ /transfer-log:(.+\|.+)/ ) {
113                         my %data;
114                         my @k = keys %$transfer_log;
115                         my @v = split(/\|/,$1);
116                         @data{@k} = @v ; # FIXME validate?
117
118                         # fixup data
119                         $data{mtime} =~ s|^(\d\d\d\d)/(\d\d)/(\d\d)-(\d\d:\d\d:\d\d)|$1-$2-$3 $4| && warn "fixed mtime ISO $data{mtime}\n";
120
121                         print "transfer-log:",dump(\%data),$/;
122
123                         my $json = encode_json \%data;
124
125                         my $path = sprintf "users/%s/data/%.5f-%d",
126                                 $data{login}, Time::HiRes::time(), $data{pid};
127                         open(my $fh, '>', $path);
128                         print $fh $json;
129                         close $fh;
130                         print $path, " ", -s $path, " bytes\n";
131
132                         $client->dispatch_background( 'log' => $json );
133
134                 }
135         }
136         close($log);
137         sleep 1;
138 }
139