Merge branch 'master' of github.com:dpavlin/angular-mojolicious
[angular-mojolicious.git] / couchdb-trigger.pl
1 #!/usr/bin/perl
2
3 # back-end trigger server for CouchDB monitoring changes feed:
4 #
5 # http://wiki.apache.org/couchdb/HTTP_database_API#Changes
6 #
7 # implements state machine using document which you cen put with:
8 #
9 # curl -X PUT http://localhost:5984/monitor/df -d '{"trigger":{"command":"df -P","format":"table"}}'
10 #
11 # DEFAULT TRIGGER EXECUTE SHELL COMMANDS. IT IS NOT SECURE IF YOUR COUCHDB ISN'T SECURE!
12
13 use warnings;
14 use strict;
15
16 use lib 'common/mojo/lib';
17
18 use Mojo::UserAgent;
19 use Mojo::JSON;
20 use Time::HiRes qw(time);
21 use Data::Dump qw(dump);
22
23 my ( $url, $trigger_path ) = @ARGV;
24
25 $url          ||= 'http://localhost:5984/monitor';
26 $trigger_path ||= 'trigger/shell.pm' ;
27
28 our $database = $1 if $url =~ m{/(\w+)/?$};
29
30 require $trigger_path if -e $trigger_path;
31
32 my $uid = `hostname -s`;
33 chomp $uid;
34 $uid .= ':' . $$;
35
36 my $seq = 0;
37
38 my $client = Mojo::UserAgent->new;
39 our $json   = Mojo::JSON->new;
40 sub info { warn $_[0], " ",$json->encode($_[1]),$/ }
41 sub debug { info "# $_[0]", $_[1] }
42 my $error;
43
44 $client->keep_alive_timeout(90); # couchdb timeout is 60s
45
46 while( ! $error ) {
47
48         my $changes_feed = "$url/_changes?feed=continuous;include_docs=true;since=$seq";
49         info 'GET' => $changes_feed;
50         my $tx = $client->build_tx( GET => $changes_feed );
51         $tx->res->body(sub{
52                 my ( $content, $body ) = @_;
53
54                 return if length($body) == 0; # empty chunk, heartbeat?
55
56                 debug 'BODY' => $body;
57
58                 foreach ( split(/\r?\n/, $body) ) { # we can get multiple documents in one chunk
59
60                         my $change = $json->decode($_);
61
62                         if ( exists $change->{error} ) {
63                                 $error = $change;
64                         } elsif ( exists $change->{last_seq} ) {
65                                 $seq = $change->{last_seq};
66                         } elsif ( $change->{seq} <= $seq ) {
67                                 info "ERROR: stale" => $change;
68                         } elsif ( exists $change->{changes} ) {
69
70                                 my $id  = $change->{id} || warn "no id?";
71                                 my $rev = $change->{changes}->[0]->{rev} || warn "no rev?";
72                                    $seq = $change->{seq} || warn "no seq?";
73
74                                 debug 'change' => $change;
75
76                                 if ( filter($change) ) {
77
78                                         if ( ! exists $change->{doc}->{trigger}->{active} ) {
79                                                 $change->{doc}->{trigger}->{active} = [ $uid, time() ];
80
81                                                 debug 'TRIGGER start PUT ', $change->{doc};
82                                                 my $client = Mojo::UserAgent->new;
83                                                 my $res = $client->put( "$url/$id" => $json->encode( $change->{doc} ) )->res;
84 warn "code ", $res->code, dump( $res->json );
85                                                 if ( $res->code == 409 ) {
86                                                         info "TRIGGER ABORTED started on another worker? ", $res->error;
87                                                         next;
88                                                 } elsif ( $res->code != 201 ) {
89                                                         info "ERROR $url/$id ", $res->code;
90                                                 }
91
92
93                                         } elsif ( $change->{doc}->{trigger}->{active}->[0] eq $uid ) {
94                                                 if ( exists $change->{doc}->{trigger}->{active}->[2] ) {
95                                                         warn "allready executed";
96                                                         next;
97                                                 } else {
98
99                                                         debug "TRIGGER execute ", $change->{doc};
100                                                         trigger( $change );
101
102                                                         push @{ $change->{doc}->{trigger}->{active} }, time(), 0; # last timestamp
103 warn "change ",dump $change;
104
105                                                         my $client = Mojo::UserAgent->new;
106                                                         my $res = $client->put( "$url/$id" => $json->encode( $change->{doc} ) )->res;
107 warn "code ", $res->code;
108                                                         if ( my $json = $res->json ) {
109 warn dump($json);
110                                                                         $change->{doc}->{_rev} = $json->{rev};
111                                                                         info "TRIGGER finish ", $change->{doc};
112                                                                 } else {
113                                                                         info "ERROR $url/$id", $tx->error;
114                                                         }
115                                                 }
116                                         }
117                                 }
118                         } else {
119                                 warn "UNKNOWN", $json->encode($change);
120                         }
121
122                 }
123
124         });
125         $client->start($tx);
126
127 }
128
129 die "ERROR ", $json->encode($error) if $error;