3 # back-end trigger server for CouchDB monitoring changes feed:
5 # http://wiki.apache.org/couchdb/HTTP_database_API#Changes
7 # implements state machine using document which you cen put with:
9 # curl -X PUT http://localhost:5984/monitor/df -d '{"trigger":{"command":"df -P","format":"table"}}'
11 # DEFAULT TRIGGER EXECUTE SHELL COMMANDS. IT IS NOT SECURE IF YOUR COUCHDB ISN'T SECURE!
16 use lib 'common/mojo/lib';
20 use Time::HiRes qw(time);
22 my $url = 'http://localhost:5984/monitor';
25 my $trigger = $_[0]->{trigger};
26 if ( my $command = $trigger->{command} ) {
28 my $output = $trigger->{output} = `$command`;
31 [ map { [ split (/\s+/,$_) ] } split(/\n/,$output) ]
32 if $trigger->{format} =~ m/table/i;
39 my $client = Mojo::Client->new;
40 our $json = Mojo::JSON->new;
41 sub info { warn $_[0], " ",$json->encode($_[1]),$/ }
42 sub debug { info "# $_[0]", $_[1] }
45 $client->keep_alive_timeout(90); # couchdb timeout is 60s
49 my $changes_feed = "$url/_changes?feed=continuous;include_docs=true;since=$seq";
50 info 'GET' => $changes_feed;
51 my $tx = $client->build_tx( GET => $changes_feed );
53 my ( $content, $body ) = @_;
55 debug 'BODY' => $body;
57 if ( length($body) == 0 ) {
58 warn "# empty chunk, heartbeat?\n";
62 foreach ( split(/\r?\n/, $body) ) { # we can get multiple documents in one chunk
64 my $change = $json->decode($_);
66 if ( exists $change->{error} ) {
68 } elsif ( exists $change->{last_seq} ) {
69 $seq = $change->{last_seq};
70 } elsif ( $change->{seq} <= $seq ) {
71 info "ERROR: stale" => $change;
72 } elsif ( exists $change->{changes} ) {
74 my $id = $change->{id} || warn "no id?";
75 my $rev = $change->{changes}->[0]->{rev} || warn "no rev?";
76 $seq = $change->{seq} || warn "no seq?";
78 debug 'change' => $change;
80 if ( my $trigger = $change->{doc}->{trigger} ) {
81 if ( exists $trigger->{active} ) {
82 debug 'trigger.active', $change->{doc}->{trigger}->{active};
84 $trigger->{active} = [ time() ];
86 debug 'TRIGGER start PUT ', $change->{doc};
87 $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
88 my ($client,$tx) = @_;
90 if ( $tx->res->code == 409 ) {
91 info "TRIGGER ABORTED started on another worker? ", $tx->error;
93 info "ERROR ", $tx->error;
96 my $res = $tx->res->json;
97 $change->{doc}->{_rev} = $res->{rev};
99 debug "TRIGGER execute ", $change->{doc};
100 _trigger( $change->{doc} );
102 push @{ $trigger->{active} }, time(), 0; # last timestamp
104 $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
105 my ($client,$tx) = @_;
107 info "ERROR", $tx->error;
109 my $res = $tx->res->json;
110 $change->{doc}->{_rev} = $res->{rev};
111 info "TRIGGER finish ", $change->{doc};
119 warn "UNKNOWN", $json->encode($change);
129 die "ERROR ", $json->encode($error) if $error;