3 # http://wiki.apache.org/couchdb/HTTP_database_API#Changes
8 use lib 'common/mojo/lib';
13 my $url = 'http://localhost:5984/monitor';
16 my $client = Mojo::Client->new;
17 our $json = Mojo::JSON->new;
18 sub info { warn $_[0], " ",$json->encode($_[1]),$/ }
19 sub debug { info "# $_[0]", $_[1] }
22 $client->keep_alive_timeout(90); # couchdb timeout is 60s
26 my $changes_feed = "$url/_changes?feed=continuous;include_docs=true;since=$seq";
27 info 'GET' => $changes_feed;
28 my $tx = $client->build_tx( GET => $changes_feed );
30 my ( $content, $body ) = @_;
32 debug 'BODY' => $body;
34 if ( length($body) == 0 ) {
35 warn "# empty chunk, heartbeat?\n";
39 foreach ( split(/\r?\n/, $body) ) { # we can get multiple documents in one chunk
41 my $change = $json->decode($_);
43 if ( exists $change->{error} ) {
45 } elsif ( exists $change->{last_seq} ) {
46 $seq = $change->{last_seq};
47 } elsif ( $change->{seq} <= $seq ) {
48 info "ERROR: stale" => $change;
49 } elsif ( exists $change->{changes} ) {
51 my $id = $change->{id} || warn "no id?";
52 my $rev = $change->{changes}->[0]->{rev} || warn "no rev?";
53 $seq = $change->{seq} || warn "no seq?";
55 debug 'change' => $change;
57 if ( my $trigger = $change->{doc}->{trigger} ) {
58 if ( exists $trigger->{active} ) {
59 debug 'trigger.active', $change->{doc}->{trigger}->{active};
61 $trigger->{active} = [ time() ];
63 debug 'TRIGGER start PUT ', $change->{doc};
64 $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
65 my ($client,$tx) = @_;
67 if ( $tx->res->code == 409 ) {
68 info "TRIGGER ABORTED started on another worker? ", $tx->error;
70 info "ERROR ", $tx->error;
73 my $res = $tx->res->json;
74 $change->{doc}->{_rev} = $res->{rev};
75 debug "TRIGGER execute ", $change->{doc};
77 # FIXME trigger logic?
79 push @{ $trigger->{active} }, time(); # timestamp step
81 $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
82 my ($client,$tx) = @_;
84 info "ERROR", $tx->error;
86 my $res = $tx->res->json;
87 $change->{doc}->{_rev} = $res->{rev};
88 info "TRIGGER finish ", $change->{doc};
96 warn "UNKNOWN", $json->encode($change);
106 die "ERROR ", $json->encode($error) if $error;