implement back-end triggers for CouchDB
[angular-mojolicious.git] / couchdb-trigger.pl
diff --git a/couchdb-trigger.pl b/couchdb-trigger.pl
new file mode 100755 (executable)
index 0000000..5f02e0f
--- /dev/null
@@ -0,0 +1,106 @@
+#!/usr/bin/perl
+
+# http://wiki.apache.org/couchdb/HTTP_database_API#Changes
+
+use warnings;
+use strict;
+
+use lib 'common/mojo/lib';
+
+use Mojo::Client;
+use Mojo::JSON;
+
+my $url = 'http://localhost:5984/monitor';
+my $seq = 0;
+
+my $client = Mojo::Client->new;
+our $json   = Mojo::JSON->new;
+sub info { warn $_[0], " ",$json->encode($_[1]),$/ }
+sub debug { info "# $_[0]", $_[1] }
+my $error;
+
+$client->keep_alive_timeout(90); # couchdb timeout is 60s
+
+while( ! $error ) {
+
+       my $changes_feed = "$url/_changes?feed=continuous;include_docs=true;since=$seq";
+       info 'GET' => $changes_feed;
+       my $tx = $client->build_tx( GET => $changes_feed );
+       $tx->res->body(sub{
+               my ( $content, $body ) = @_;
+
+               debug 'BODY' => $body;
+
+               if ( length($body) == 0 ) {
+                       warn "# empty chunk, heartbeat?\n";
+                       return;
+               }
+
+               foreach ( split(/\r?\n/, $body) ) { # we can get multiple documents in one chunk
+
+                       my $change = $json->decode($_);
+
+                       if ( exists $change->{error} ) {
+                               $error = $change;
+                       } elsif ( exists $change->{last_seq} ) {
+                               $seq = $change->{last_seq};
+                       } elsif ( $change->{seq} <= $seq ) {
+                               info "ERROR: stale" => $change;
+                       } elsif ( exists $change->{changes} ) {
+
+                               my $id  = $change->{id} || warn "no id?";
+                               my $rev = $change->{changes}->[0]->{rev} || warn "no rev?";
+                                  $seq = $change->{seq} || warn "no seq?";
+
+                               debug 'change' => $change;
+
+                               if ( my $trigger = $change->{doc}->{trigger} ) {
+                                       if ( exists $trigger->{active} ) {
+                                               debug 'trigger.active',  $change->{doc}->{trigger}->{active};
+                                       } else {
+                                               $trigger->{active} = [ time() ];
+
+                                               debug 'TRIGGER start PUT ', $change->{doc};
+                                               $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
+                                                       my ($client,$tx) = @_;
+                                                       if ($tx->error) {
+                                                               if ( $tx->res->code == 409 ) {
+                                                                       info "TRIGGER ABORTED started on another worker? ", $tx->error;
+                                                               } else {
+                                                                       info "ERROR ", $tx->error;
+                                                               }
+                                                       } else {
+                                                               my $res = $tx->res->json;
+                                                               $change->{doc}->{_rev} = $res->{rev};
+                                                               debug "TRIGGER execute ", $change->{doc};
+
+                                                               # FIXME trigger logic?
+
+                                                               push @{ $trigger->{active} }, time(); # timestamp step
+
+                                                               $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
+                                                                       my ($client,$tx) = @_;
+                                                                       if ($tx->error) {
+                                                                               info "ERROR", $tx->error;
+                                                                       } else {
+                                                                               my $res = $tx->res->json;
+                                                                               $change->{doc}->{_rev} = $res->{rev};
+                                                                               info "TRIGGER finish ", $change->{doc};
+                                                                       }
+                                                               })->process;
+                                                       }
+                                               })->process;
+                                       }
+                               }
+                       } else {
+                               warn "UNKNOWN", $json->encode($change);
+                       }
+
+               }
+
+       });
+       $client->start($tx);
+
+}
+
+die "ERROR ", $json->encode($error) if $error;