rewrite CouchDB trigger
authorDobrica Pavlinusic <dpavlin@rot13.org>
Mon, 3 Oct 2011 20:34:04 +0000 (22:34 +0200)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Mon, 3 Oct 2011 20:34:04 +0000 (22:34 +0200)
This version uses just one put request for each received document,
but expects to get notification of lock which is now first element
of active array in format of hostname:pid

It makes it play nicely with new Mojo::UserAgent which doesn't
want to issue multiple put requests even if I create new Mojo::UserAgent
for each of them.

couchdb-trigger.pl

index 2ec5705..f23e505 100755 (executable)
@@ -27,9 +27,12 @@ $trigger_path ||= 'trigger/shell.pm' ;
 
 our $database = $1 if $url =~ m{/(\w+)/?$};
 
 
 our $database = $1 if $url =~ m{/(\w+)/?$};
 
-sub commit { warn "# commit ignored\n"; }
 require $trigger_path if -e $trigger_path;
 
 require $trigger_path if -e $trigger_path;
 
+my $uid = `hostname -s`;
+chomp $uid;
+$uid .= ':' . $$;
+
 my $seq = 0;
 
 my $client = Mojo::UserAgent->new;
 my $seq = 0;
 
 my $client = Mojo::UserAgent->new;
@@ -71,41 +74,45 @@ while( ! $error ) {
                                debug 'change' => $change;
 
                                if ( filter($change) ) {
                                debug 'change' => $change;
 
                                if ( filter($change) ) {
-                                       if ( exists $change->{doc}->{trigger}->{active} ) {
-                                               debug 'trigger.active',  $change->{doc}->{trigger}->{active};
-                                       } else {
-                                               $change->{doc}->{trigger}->{active} = [ time() ];
+
+                                       if ( ! exists $change->{doc}->{trigger}->{active} ) {
+                                               $change->{doc}->{trigger}->{active} = [ $uid, time() ];
 
                                                debug 'TRIGGER start PUT ', $change->{doc};
 
                                                debug 'TRIGGER start PUT ', $change->{doc};
-                                               $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
-                                                       my ($client,$tx) = @_;
-                                                       if ($tx->error) {
-                                                               if ( $tx->res->code == 409 ) {
-                                                                       info "TRIGGER ABORTED started on another worker? ", $tx->error;
+                                               my $client = Mojo::UserAgent->new;
+                                               my $res = $client->put( "$url/$id" => $json->encode( $change->{doc} ) )->res;
+warn "code ", $res->code, dump( $res->json );
+                                               if ( $res->code == 409 ) {
+                                                       info "TRIGGER ABORTED started on another worker? ", $res->error;
+                                                       next;
+                                               } elsif ( $res->code != 201 ) {
+                                                       info "ERROR $url/$id ", $res->code;
+                                               }
+
+
+                                       } elsif ( $change->{doc}->{trigger}->{active}->[0] eq $uid ) {
+                                               if ( exists $change->{doc}->{trigger}->{active}->[2] ) {
+                                                       warn "allready executed";
+                                                       next;
+                                               } else {
+
+                                                       debug "TRIGGER execute ", $change->{doc};
+                                                       trigger( $change );
+
+                                                       push @{ $change->{doc}->{trigger}->{active} }, time(), 0; # last timestamp
+warn "change ",dump $change;
+
+                                                       my $client = Mojo::UserAgent->new;
+                                                       my $res = $client->put( "$url/$id" => $json->encode( $change->{doc} ) )->res;
+warn "code ", $res->code;
+                                                       if ( my $json = $res->json ) {
+warn dump($json);
+                                                                       $change->{doc}->{_rev} = $json->{rev};
+                                                                       info "TRIGGER finish ", $change->{doc};
                                                                } else {
                                                                } else {
-                                                                       info "ERROR $url/$id ", $tx->error;
-                                                               }
-                                                       } else {
-                                                               my $res = $tx->res->json;
-                                                               $change->{doc}->{_rev} = $res->{rev};
-
-                                                               debug "TRIGGER execute ", $change->{doc};
-                                                               trigger( $change );
-
-                                                               push @{ $change->{doc}->{trigger}->{active} }, time(), 0; # last timestamp
-
-                                                               $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub {
-                                                                       my ($client,$tx) = @_;
-                                                                       if ($tx->error) {
-                                                                               info "ERROR $url/$id", $tx->error;
-                                                                       } else {
-                                                                               my $res = $tx->res->json;
-                                                                               $change->{doc}->{_rev} = $res->{rev};
-                                                                               info "TRIGGER finish ", $change->{doc};
-                                                                       }
-                                                               })->process;
+                                                                       info "ERROR $url/$id", $tx->error;
                                                        }
                                                        }
-                                               })->process;
+                                               }
                                        }
                                }
                        } else {
                                        }
                                }
                        } else {
@@ -114,8 +121,6 @@ while( ! $error ) {
 
                }
 
 
                }
 
-               commit;
-
        });
        $client->start($tx);
 
        });
        $client->start($tx);