From a47b4febe050bd2cf5deec4f96a2c08fe80975f7 Mon Sep 17 00:00:00 2001 From: Dobrica Pavlinusic Date: Mon, 3 Oct 2011 22:34:04 +0200 Subject: [PATCH] rewrite CouchDB trigger This version uses just one put request for each received document, but expects to get notification of lock which is now first element of active array in format of hostname:pid It makes it play nicely with new Mojo::UserAgent which doesn't want to issue multiple put requests even if I create new Mojo::UserAgent for each of them. --- couchdb-trigger.pl | 73 +++++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/couchdb-trigger.pl b/couchdb-trigger.pl index 2ec5705..f23e505 100755 --- a/couchdb-trigger.pl +++ b/couchdb-trigger.pl @@ -27,9 +27,12 @@ $trigger_path ||= 'trigger/shell.pm' ; our $database = $1 if $url =~ m{/(\w+)/?$}; -sub commit { warn "# commit ignored\n"; } require $trigger_path if -e $trigger_path; +my $uid = `hostname -s`; +chomp $uid; +$uid .= ':' . $$; + my $seq = 0; my $client = Mojo::UserAgent->new; @@ -71,41 +74,45 @@ while( ! $error ) { debug 'change' => $change; if ( filter($change) ) { - if ( exists $change->{doc}->{trigger}->{active} ) { - debug 'trigger.active', $change->{doc}->{trigger}->{active}; - } else { - $change->{doc}->{trigger}->{active} = [ time() ]; + + if ( ! exists $change->{doc}->{trigger}->{active} ) { + $change->{doc}->{trigger}->{active} = [ $uid, time() ]; debug 'TRIGGER start PUT ', $change->{doc}; - $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub { - my ($client,$tx) = @_; - if ($tx->error) { - if ( $tx->res->code == 409 ) { - info "TRIGGER ABORTED started on another worker? ", $tx->error; + my $client = Mojo::UserAgent->new; + my $res = $client->put( "$url/$id" => $json->encode( $change->{doc} ) )->res; +warn "code ", $res->code, dump( $res->json ); + if ( $res->code == 409 ) { + info "TRIGGER ABORTED started on another worker? ", $res->error; + next; + } elsif ( $res->code != 201 ) { + info "ERROR $url/$id ", $res->code; + } + + + } elsif ( $change->{doc}->{trigger}->{active}->[0] eq $uid ) { + if ( exists $change->{doc}->{trigger}->{active}->[2] ) { + warn "allready executed"; + next; + } else { + + debug "TRIGGER execute ", $change->{doc}; + trigger( $change ); + + push @{ $change->{doc}->{trigger}->{active} }, time(), 0; # last timestamp +warn "change ",dump $change; + + my $client = Mojo::UserAgent->new; + my $res = $client->put( "$url/$id" => $json->encode( $change->{doc} ) )->res; +warn "code ", $res->code; + if ( my $json = $res->json ) { +warn dump($json); + $change->{doc}->{_rev} = $json->{rev}; + info "TRIGGER finish ", $change->{doc}; } else { - info "ERROR $url/$id ", $tx->error; - } - } else { - my $res = $tx->res->json; - $change->{doc}->{_rev} = $res->{rev}; - - debug "TRIGGER execute ", $change->{doc}; - trigger( $change ); - - push @{ $change->{doc}->{trigger}->{active} }, time(), 0; # last timestamp - - $client->put( "$url/$id" => $json->encode( $change->{doc} ) => sub { - my ($client,$tx) = @_; - if ($tx->error) { - info "ERROR $url/$id", $tx->error; - } else { - my $res = $tx->res->json; - $change->{doc}->{_rev} = $res->{rev}; - info "TRIGGER finish ", $change->{doc}; - } - })->process; + info "ERROR $url/$id", $tx->error; } - })->process; + } } } } else { @@ -114,8 +121,6 @@ while( ! $error ) { } - commit; - }); $client->start($tx); -- 2.20.1