casopisi 2010
[angular-mojolicious.git] / couchdb-trigger.pl
1 #!/usr/bin/perl
2
3 # back-end trigger server for CouchDB monitoring changes feed:
4 #
5 # http://wiki.apache.org/couchdb/HTTP_database_API#Changes
6 #
7 # implements state machine using document which you cen put with:
8 #
9 # curl -X PUT http://localhost:5984/monitor/df -d '{"trigger":{"command":"df -P","format":"table"}}'
10 #
11 # DEFAULT TRIGGER EXECUTE SHELL COMMANDS. IT IS NOT SECURE IF YOUR COUCHDB ISN'T SECURE!
12
13 use warnings;
14 use strict;
15
16 use lib 'common/mojo/lib';
17
18 use Mojo::Client;
19 use Mojo::JSON;
20 use Time::HiRes qw(time);
21
22 my $url = 'http://localhost:5984/monitor';
23
24 sub _trigger {
25         my $trigger = $_[0]->{trigger};
26         if ( my $command = $trigger->{command} ) {
27                 # FIXME SECURITY HOLE
28                 my $output = $trigger->{output} = `$command`;
29
30                 $trigger->{output} =
31                         [ map { [ split (/\s+/,$_) ] } split(/\n/,$output) ]
32                         if $trigger->{format} =~ m/table/i;
33         }
34         return $trigger;
35 }
36
37 my $seq = 0;
38
39 my $client = Mojo::Client->new;
40 our $json   = Mojo::JSON->new;
41 sub info { warn $_[0], " ",$json->encode($_[1]),$/ }
42 sub debug { info "# $_[0]", $_[1] }
43 my $error;
44
45 $client->keep_alive_timeout(90); # couchdb timeout is 60s
46
47 while( ! $error ) {
48
49         my $changes_feed = "$url/_changes?feed=continuous;include_docs=true;since=$seq";
50         info 'GET' => $changes_feed;
51         my $tx = $client->build_tx( GET => $changes_feed );
52         $tx->res->body(sub{
53                 my ( $content, $body ) = @_;
54
55                 debug 'BODY' => $body;
56
57                 if ( length($body) == 0 ) {
58                         warn "# empty chunk, heartbeat?\n";
59                         return;
60                 }
61
62                 foreach ( split(/\r?\n/, $body) ) { # we can get multiple documents in one chunk
63
64                         my $change = $json->decode($_);
65
66                         if ( exists $change->{error} ) {
67                                 $error = $change;
68                         } elsif ( exists $change->{last_seq} ) {
69                                 $seq = $change->{last_seq};
70                         } elsif ( $change->{seq} <= $seq ) {
71                                 info "ERROR: stale" => $change;
72                         } elsif ( exists $change->{changes} ) {
73
74                                 my $id  = $change->{id} || warn "no id?";
75                                 my $rev = $change->{changes}->[0]->{rev} || warn "no rev?";
76                                    $seq = $change->{seq} || warn "no seq?";
77
78                                 debug 'change' => $change;
79
80                                 if ( my $trigger = $change->{doc}->{trigger} ) {
81                                         if ( exists $trigger->{active} ) {
82                                                 debug 'trigger.active',  $change->{doc}->{trigger}->{active};
83                                         } else {
84                                                 $trigger->{active} = [ time() ];
85
86                                                 debug 'TRIGGER start PUT ', $change->{doc};
87                                                 $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
88                                                         my ($client,$tx) = @_;
89                                                         if ($tx->error) {
90                                                                 if ( $tx->res->code == 409 ) {
91                                                                         info "TRIGGER ABORTED started on another worker? ", $tx->error;
92                                                                 } else {
93                                                                         info "ERROR ", $tx->error;
94                                                                 }
95                                                         } else {
96                                                                 my $res = $tx->res->json;
97                                                                 $change->{doc}->{_rev} = $res->{rev};
98
99                                                                 debug "TRIGGER execute ", $change->{doc};
100                                                                 _trigger( $change->{doc} );
101
102                                                                 push @{ $trigger->{active} }, time(), 0; # last timestamp
103
104                                                                 $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
105                                                                         my ($client,$tx) = @_;
106                                                                         if ($tx->error) {
107                                                                                 info "ERROR", $tx->error;
108                                                                         } else {
109                                                                                 my $res = $tx->res->json;
110                                                                                 $change->{doc}->{_rev} = $res->{rev};
111                                                                                 info "TRIGGER finish ", $change->{doc};
112                                                                         }
113                                                                 })->process;
114                                                         }
115                                                 })->process;
116                                         }
117                                 }
118                         } else {
119                                 warn "UNKNOWN", $json->encode($change);
120                         }
121
122                 }
123
124         });
125         $client->start($tx);
126
127 }
128
129 die "ERROR ", $json->encode($error) if $error;