zebraqueue - major changes
authorGalen Charlton <galen.charlton@liblime.com>
Wed, 5 Mar 2008 00:26:02 +0000 (18:26 -0600)
committerJoshua Ferraro <jmf@liblime.com>
Wed, 5 Mar 2008 04:25:11 +0000 (22:25 -0600)
Signed-off-by: Joshua Ferraro <jmf@liblime.com>
misc/bin/zebraqueue_daemon.pl

index b0ec125..ea788d5 100755 (executable)
@@ -9,6 +9,7 @@ BEGIN {
     use FindBin;
     eval { require "$FindBin::Bin/kohalib.pl" };
 }
+
 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Stream Driver::SysRW);
 use Unix::Syslog qw(:macros);
 
@@ -17,12 +18,75 @@ use C4::Biblio;
 use C4::Search;
 use C4::AuthoritiesMarc;
 use XML::Simple;
+use POSIX;
 use utf8;
 
-my $dbh = C4::Context->dbh;
+
+# wait periods governing connection attempts
+my $min_connection_wait =    1; # start off at 1 second
+my $max_connection_wait = 1024; # max about 17 minutes
+
+# keep separate wait period for bib and authority Zebra databases
+my %zoom_connection_waits = (); 
+
+my $db_connection_wait = $min_connection_wait;
+
+# ZOOM and Z39.50 errors that are potentially
+# resolvable by connecting again and retrying
+# the operation
+my %retriable_zoom_errors = (
+    10000 => 'ZOOM_ERROR_CONNECT',
+    10001 => 'ZOOM_ERROR_MEMORY',
+    10002 => 'ZOOM_ERROR_ENCODE',
+    10003 => 'ZOOM_ERROR_DECODE',
+    10004 => 'ZOOM_ERROR_CONNECTION_LOST',
+    10005 => 'ZOOM_ERROR_INIT',
+    10006 => 'ZOOM_ERROR_INTERNAL',
+    10007 => 'ZOOM_ERROR_TIMEOUT',
+);
+
+# structure to store updates that have
+# failed and are to be retrieved.  The
+# structure is a hashref of hashrefs, 
+# e.g.,
+#
+# $postoned_updates->{$server}->{$record_number} = 1;
+#
+# If an operation is attempted and fails because
+# of a retriable error (see above), the daemon
+# will try several times to recover as follows:
+#
+# 1. close and reopen the connection to the
+#    Zebra server, unless the error was a timeout,
+#    in which case
+# 2. retry the operation
+#
+# If, after trying this five times, the operation still
+# fails, the daemon will mark the record number as
+# postponed, and try to process other entries in 
+# zebraqueue.  When an update is postponed, the 
+# error will be reported to syslog. 
+#
+# If more than 100 postponed updates are 
+# accumulated, the daemon will assume that 
+# something is seriously wrong, complain loudly,
+# and abort.  If running under the daemon(1) command, 
+# this means that the daemon will respawn.
+#
+my $num_postponed_updates = 0;
+my $postponed_updates = {};
+
+my $max_operation_attempts =   5;
+my $max_postponed_updates  = 100;
+
+# Zebra connection timeout
+my $zconn_timeout            =  30;
+my $zconn_timeout_multiplier = 1.5;
+my $max_zconn_timeout        = 120;
+
 my $ident = "Koha Zebraqueue ";
 
-my $debug = 1;
+my $debug = 0;
 Unix::Syslog::openlog $ident, LOG_PID, LOG_LOCAL0;
 
 Unix::Syslog::syslog LOG_INFO, "Starting Zebraqueue log at " . scalar localtime(time) . "\n";
@@ -54,7 +118,7 @@ sub handler_sleep {
 sub handler_check {
     # check if we need to do anything, at the moment just checks the zebraqueue, it could check other things too
     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
-    my $dbh = C4::Context->dbh;
+    my $dbh = get_db_connection();
     my $sth = $dbh->prepare("SELECT count(*) AS opcount FROM zebraqueue WHERE done = 0");
     $sth->execute;
     my $data = $sth->fetchrow_hashref();
@@ -72,154 +136,205 @@ sub handler_check {
 sub zebraop {
     # execute operations waiting in the zebraqueue
     my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
-    my $dbh = C4::Context->dbh;
+    my $dbh = get_db_connection();
     my $readsth = $dbh->prepare("SELECT id, biblio_auth_number, operation, server FROM zebraqueue WHERE done = 0");
     $readsth->execute();
     Unix::Syslog::syslog LOG_INFO, "Executing zebra operations\n";
-    while (my $data = $readsth->fetchrow_hashref()) {
+
+    ZEBRAQUEUE: while (my $data = $readsth->fetchrow_hashref()) {
         warn "Inside while loop" if $debug;
-        eval {
-            my $ok = 0;
-            my $record;
-            if ($data->{'operation'} =~ /delete/i ) {
-                eval {
-
-                    warn "Searching for record to delete" if $debug;
-                    # 1st read the record in zebra, we have to get it from zebra as its no longer in the db
-                    my $Zconn = C4::Context->Zconn($data->{'server'}, 0, 1, '', 'xml');
-                    my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$data->{'biblio_auth_number'});
-                    $results->option(elementSetName => 'marcxml');
-                    $record = $results->record(0)->raw();
-                };
-                if ($@) {
-                    # this doesn't exist, so no need to wail on zebra to delete it
-                    if ($@->code() eq 13) {
-                        $ok = 1;
-                    } else {
-                        # caught a ZOOM::Exception
-                        my $error =
-                            $@->message() . " ("
-                            . $@->code() . ") "
-                            . $@->addinfo() . " "
-                            . $@->diagset();
-                        warn "ERROR: $error";
-                    }
-                } else {
-                    # then, delete the record
-                    warn "Deleting record" if $debug;
-                    $ok = zebrado($record, $data->{'operation'}, $data->{'server'}, $data->{'biblio_auth_number'});
-                }
-            }
-            else {
-                # it is an update           
-                warn "Updating record" if $debug;
-                # get the XML
-                my $marcxml;
-                if ($data->{'server'} eq "biblioserver") {
-                    my $marc = GetMarcBiblio($data->{'biblio_auth_number'});
-                    $marcxml = $marc->as_xml_record() if $marc;
-                } 
-                elsif ($data->{'server'} eq "authorityserver") {
-                    $marcxml = C4::AuthoritiesMarc::GetAuthorityXML($data->{'biblio_auth_number'});
-                }
-                # check it's XML, just in case
-                eval {
-                    my $hashed = XMLin($marcxml);
-                }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe
-                ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
-                if ($@) {
-                    Unix::Syslog::syslog LOG_ERR, "$@";
-                    my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
-                    $delsth->execute($data->{'id'});
-                    next;
-                }
-                # ok, we have everything, do the operation in zebra !
-                $ok = zebrado($marcxml, $data->{'operation'}, $data->{'server'}, $data->{'biblio_auth_number'});
-            }
-            if ($ok == 1) {
-                $dbh = C4::Context->dbh;
-                my $delsth;
-                # if it's a deletion, we can delete every request on this biblio : in case the user
-                # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
-                # that are pending and can't succeed, as we don't have the XML anymore
-                # so, delete everything for this biblionumber
-                if ($data->{'operation'} =~ /delete/i) {
-                    $delsth = $dbh->prepare("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number = ?");
-                    $delsth->execute($data->{'biblio_auth_number'});
-                    # if it's not a deletion, delete every pending specialUpdate for this biblionumber
-                    # in case the user add biblio, then X items, before this script runs
-                    # this avoid indexing X+1 times where just 1 is enough.
-                } else {
-                    $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 
-                                             WHERE biblio_auth_number = ? and operation = 'specialUpdate'");
-                    $delsth->execute($data->{'biblio_auth_number'});
-                }
-            }                            
-        };
-        if ($@) {
-            Unix::Syslog::syslog LOG_ERR, "$@";
+
+        my $id = $data->{'id'};
+        my $op = $data->{'operation'};
+        $op = 'recordDelete' if $op =~ /delete/i; # delete ops historically have been coded
+                                                  # either delete_record or recordDelete
+        my $record_number = $data->{'biblio_auth_number'};
+        my $server = $data->{'server'};
+
+        next ZEBRAQUEUE if exists $postponed_updates->{$server}->{$record_number};
+
+        my $ok = 0;
+        my $record;
+        if ($op eq 'recordDelete') {
+            $ok = process_delete($dbh, $server, $record_number);
+        }
+        else {
+            $ok = process_update($dbh, $server, $record_number, $id);
         }
+        if ($ok == 1) {
+            mark_done($dbh, $record_number, $op);
+        }                            
     }
     $readsth->finish();
     $kernel->yield('status_check');
 }
 
+sub process_delete {
+    my $dbh = shift;
+    my $server = shift;
+    my $record_number = shift;
+
+    my $record;
+    my $ok = 0;
+    eval {
+        warn "Searching for record to delete" if $debug;
+        # 1st read the record in zebra, we have to get it from zebra as its no longer in the db
+        my $Zconn =  get_zebra_connection($server);
+        my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$record_number);
+        $results->option(elementSetName => 'marcxml');
+        $record = $results->record(0)->raw();
+    };
+    if ($@) {
+        # this doesn't exist, so no need to wail on zebra to delete it
+        if ($@->code() eq 13) {
+            $ok = 1;
+        } else {
+            # caught a ZOOM::Exception
+            my $error = _format_zoom_error_message($@);
+            warn "ERROR: $error";
+        }
+    } else {
+        # then, delete the record
+        warn "Deleting record" if $debug;
+        $ok = zebrado($record, 'recordDelete', $server, $record_number);
+    }
+    return $ok;
+}
+
+sub process_update {
+    my $dbh = shift;
+    my $server = shift;
+    my $record_number = shift;
+    my $id = shift;
+
+    my $record;
+    my $ok = 0;
+
+    warn "Updating record" if $debug;
+    # get the XML
+    my $marcxml;
+    if ($server eq "biblioserver") {
+        my $marc = GetMarcBiblio($record_number);
+        $marcxml = $marc->as_xml_record() if $marc;
+    } 
+    elsif ($server eq "authorityserver") {
+        $marcxml = C4::AuthoritiesMarc::GetAuthorityXML($record_number);
+    }
+    # check it's XML, just in case
+    eval {
+        my $hashed = XMLin($marcxml);
+    }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe
+    ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
+    if ($@) {
+        Unix::Syslog::syslog LOG_ERR, "$server record $record_number is malformed: $@";
+        mark_done_by_id($dbh, $id);
+        $ok = 0;
+    } else {
+        # ok, we have everything, do the operation in zebra !
+        $ok = zebrado($marcxml, 'specialUpdate', $server, $record_number);
+    }
+    return $ok;
+}
+
+sub mark_done_by_id {
+    my $dbh = shift;
+    my $id = shift;
+    my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
+    $delsth->execute($id);
+}
+
+sub mark_done {
+    my $dbh = shift;
+    my $record_number = shift;
+    my $op = shift;
+
+    my $delsth;
+    if ($op eq 'recordDelete') {
+        # if it's a deletion, we can delete every request on this biblio : in case the user
+        # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
+        # that are pending and can't succeed, as we don't have the XML anymore
+        # so, delete everything for this biblionumber
+        $delsth = $dbh->prepare_cached("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number = ? and operation = ?");
+        $delsth->execute($record_number, $op);
+    } else {
+        # if it's not a deletion, delete every pending specialUpdate for this biblionumber
+        # in case the user add biblio, then X items, before this script runs
+        # this avoid indexing X+1 times where just 1 is enough.
+        $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 
+                                 WHERE biblio_auth_number = ? and operation = 'specialUpdate'");
+        $delsth->execute($record_number);
+    }
+}
+
 sub zebrado {
     ###Accepts a $server variable thus we can use it to update  biblios, authorities or other zebra dbs
-    my ($record, $op, $server, $biblionumber) = @_;
-
-    warn "In zebrado" if $debug; 
-    my @port;
-    
-    my $tried = 0;
-    my $recon = 0;
-    my $reconnect = 0;
-#    $record=Encode::encode("UTF-8",$record);
-    my $shadow = $server."shadow";
-    $op = 'recordDelete' if $op eq 'delete_record';
-
-reconnect:
-    warn "At reconnect" if $debug; 
-    my $Zconn = C4::Context->Zconn($server, 0, 1, '', 'xml');
-    if ($record) {
-        warn "Record found" if $debug;
+    my ($record, $op, $server, $record_number) = @_;
+
+    unless ($record) {
+        my $message = "error updating index for $server $record $record_number: no source record";
+        postpone_update($server, $record_number, $message);
+        return 0;
+    }
+
+    my $attempts = 0;
+    my $ok = 0;
+    ATTEMPT: while ($attempts < $max_operation_attempts) {
+        $attempts++;
+        warn "Attempt $attempts for $op for $server $record_number" if $debug;
+        my $Zconn = get_zebra_connection($server);
+
         my $Zpackage = $Zconn->package();
         $Zpackage->option(action => $op);
         $Zpackage->option(record => $record);
-#       $Zpackage->option(recordIdOpaque => $biblionumber) if $biblionumber;
-retry:
-        warn "At Retry" if $debug;
+
         eval { $Zpackage->send("update") };
         if ($@ && $@->isa("ZOOM::Exception")) {
-            print "Oops!  ", $@->message(), "\n";
-            return $@->code();
+            my $message = _format_zoom_error_message($@);
+            my $error = $@->code();
+            if (exists $retriable_zoom_errors{$error}) {
+                warn "reattempting operation $op for $server $record_number" if $debug;
+                warn "last Zebra error was $message" if $debug;
+                $Zpackage->destroy();
+
+                if ($error == 10007 and $zconn_timeout < $max_zconn_timeout) {
+                    # bump up connection timeout
+                    $zconn_timeout = POSIX::ceil($zconn_timeout * $zconn_timeout_multiplier);
+                    $zconn_timeout = $max_zconn_timeout if $zconn_timeout > $max_zconn_timeout;
+                    Unix::Syslog::syslog LOG_INFO, "increased Zebra connection timeout to $zconn_timeout\n";
+                    warn "increased Zebra connection timeout to $zconn_timeout" if $debug;
+                }
+                next ATTEMPT;
+            } else {
+                postpone_update($server, $record_number, $message);
+            }
         }
-        my($error, $errmsg, $addinfo, $diagset) = $Zconn->error_x();
-        if ($error == 10007 && $tried < 3) {## timeout --another 30 looonng seconds for this update
-            sleep 1;    ##  wait a sec!
-            $tried++;
-            goto "retry";
-        } elsif ($error == 2 && $tried < 2) {## timeout --temporary zebra error !whatever that means
-            sleep 2;    ##  wait two seconds!
-            $tried++;
-            goto "retry";
-        } elsif ($error==10004 && $recon == 0) {##Lost connection -reconnect
-            sleep 1;    ##  wait a sec!
-            $recon = 1;
-            $Zpackage->destroy();
-            $Zconn->destroy();
-            goto "reconnect";
-        } elsif ($error) {
-            $Zpackage->destroy();
-            $Zconn->destroy();
-            return 0;
+        eval { $Zpackage->send('commit'); };
+        if ($@) {
+            # operation succeeded, but commit
+            # did not - we have a problem
+            my $message = _format_zoom_error_message($@);
+            postpone_update($server, $record_number, $message);
+        } else {
+            $ok = 1;
+            last ATTEMPT;
         }
-        $Zpackage->send('commit');
-        return 1;
     }
-    return 0;
+
+    unless ($ok) {
+        my $message = "Made $attempts attempts to index $server record $record_number without success";
+        postpone_update($server, $record_number, $message);
+    }
+
+    return $ok; 
 }
 
+sub postpone_update {
+    my ($server, $record_number, $message) = @_;
+    warn $message if $debug;
+    $message .= "\n" unless $message =~ /\n$/;
+    Unix::Syslog::syslog LOG_ERR, $message;
+    $postponed_updates->{$server}->{$record_number} = 1;
+}
 
 sub handler_stop {
     my $heap = $_[HEAP];
@@ -228,6 +343,87 @@ sub handler_stop {
     delete $heap->{session};
 }
 
+# get a DB connection
+sub get_db_connection {
+    my $dbh;
+
+    $db_connection_wait = $min_connection_wait unless defined $db_connection_wait;
+    while (1) {
+        eval {
+            # note that C4::Context caches the
+            # DB handle; C4::Context->dbh() will
+            # check that handle first before returning
+            # it.  If the connection is bad, it
+            # then tries (once) to create a new one.
+            $dbh = C4::Context->dbh();
+        };
+
+        unless ($@) {
+            # C4::Context->dbh dies if it cannot
+            # establish a connection
+            $db_connection_wait = $min_connection_wait;
+            return $dbh;
+        }
+
+        # connection failed
+        my $error = "failed to connect to DB: $DBI::errstr";
+        warn $error if $debug;
+        Unix::Syslog::syslog LOG_ERR, $error;
+        sleep $db_connection_wait;
+        $db_connection_wait *= 2 unless $db_connection_wait >= $max_connection_wait;
+    }
+}
+
+# get a Zebra connection
+sub get_zebra_connection {
+    my $server = shift;
+
+    # start connection retry wait queue if necessary
+    $zoom_connection_waits{$server} = $min_connection_wait unless exists  $zoom_connection_waits{$server};
+
+    # try to connect to Zebra forever until we succeed
+    while (1) {
+        # what follows assumes that C4::Context->Zconn 
+        # makes only one attempt to create a new connection;
+        my $Zconn = C4::Context->Zconn($server, 0, 1, '', 'xml');
+        $Zconn->option('timeout' => $zconn_timeout);
+
+        # it is important to note that if the existing connection
+        # stored by C4::Context has an error (any type of error)
+        # from the last transaction, C4::Context->Zconn closes
+        # it and establishes a new one.  Therefore, the
+        # following check will succeed if we have a new, good 
+        # connection or we're using a previously established
+        # connection that has experienced no errors.
+        if ($Zconn->errcode() == 0) {
+            $zoom_connection_waits{$server} = $min_connection_wait;
+            return $Zconn;
+        }
+
+        # connection failed
+        my $error = _format_zoom_error_message($Zconn);
+        warn $error if $debug;
+        Unix::Syslog::syslog LOG_ERR, $error;
+        sleep $zoom_connection_waits{$server};
+        $zoom_connection_waits{$server} *= 2 unless $zoom_connection_waits{$server} >= $max_connection_wait;
+    }
+}
+
+# given a ZOOM::Exception or
+# ZOOM::Connection object, generate
+# a human-reaable error message
+sub _format_zoom_error_message {
+    my $err = shift;
+
+    my $message = "";
+    if (ref($err) eq 'ZOOM::Connection') {
+        $message = $err->errmsg() . " (" . $err->diagset . " " . $err->errcode() . ") " . $err->addinfo();
+    } elsif (ref($err) eq 'ZOOM::Exception') {
+        $message = $err->message() . " (" . $err->diagset . " " .  $err->code() . ") " . $err->addinfo();
+    }
+    return $message; 
+}
+
 POE::Session->create(
     inline_states => {
         _start       => \&handler_start,