use FindBin;
eval { require "$FindBin::Bin/kohalib.pl" };
}
+
use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Stream Driver::SysRW);
use Unix::Syslog qw(:macros);
use C4::Search;
use C4::AuthoritiesMarc;
use XML::Simple;
+use POSIX;
use utf8;
-my $dbh = C4::Context->dbh;
+
+# wait periods governing connection attempts
+my $min_connection_wait = 1; # start off at 1 second
+my $max_connection_wait = 1024; # max about 17 minutes
+
+# keep separate wait period for bib and authority Zebra databases
+my %zoom_connection_waits = ();
+
+my $db_connection_wait = $min_connection_wait;
+
+# ZOOM and Z39.50 errors that are potentially
+# resolvable by connecting again and retrying
+# the operation
+my %retriable_zoom_errors = (
+ 10000 => 'ZOOM_ERROR_CONNECT',
+ 10001 => 'ZOOM_ERROR_MEMORY',
+ 10002 => 'ZOOM_ERROR_ENCODE',
+ 10003 => 'ZOOM_ERROR_DECODE',
+ 10004 => 'ZOOM_ERROR_CONNECTION_LOST',
+ 10005 => 'ZOOM_ERROR_INIT',
+ 10006 => 'ZOOM_ERROR_INTERNAL',
+ 10007 => 'ZOOM_ERROR_TIMEOUT',
+);
+
+# structure to store updates that have
+# failed and are to be retrieved. The
+# structure is a hashref of hashrefs,
+# e.g.,
+#
+# $postoned_updates->{$server}->{$record_number} = 1;
+#
+# If an operation is attempted and fails because
+# of a retriable error (see above), the daemon
+# will try several times to recover as follows:
+#
+# 1. close and reopen the connection to the
+# Zebra server, unless the error was a timeout,
+# in which case
+# 2. retry the operation
+#
+# If, after trying this five times, the operation still
+# fails, the daemon will mark the record number as
+# postponed, and try to process other entries in
+# zebraqueue. When an update is postponed, the
+# error will be reported to syslog.
+#
+# If more than 100 postponed updates are
+# accumulated, the daemon will assume that
+# something is seriously wrong, complain loudly,
+# and abort. If running under the daemon(1) command,
+# this means that the daemon will respawn.
+#
+my $num_postponed_updates = 0;
+my $postponed_updates = {};
+
+my $max_operation_attempts = 5;
+my $max_postponed_updates = 100;
+
+# Zebra connection timeout
+my $zconn_timeout = 30;
+my $zconn_timeout_multiplier = 1.5;
+my $max_zconn_timeout = 120;
+
my $ident = "Koha Zebraqueue ";
-my $debug = 1;
+my $debug = 0;
Unix::Syslog::openlog $ident, LOG_PID, LOG_LOCAL0;
Unix::Syslog::syslog LOG_INFO, "Starting Zebraqueue log at " . scalar localtime(time) . "\n";
sub handler_check {
# check if we need to do anything, at the moment just checks the zebraqueue, it could check other things too
my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
- my $dbh = C4::Context->dbh;
+ my $dbh = get_db_connection();
my $sth = $dbh->prepare("SELECT count(*) AS opcount FROM zebraqueue WHERE done = 0");
$sth->execute;
my $data = $sth->fetchrow_hashref();
sub zebraop {
# execute operations waiting in the zebraqueue
my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
- my $dbh = C4::Context->dbh;
+ my $dbh = get_db_connection();
my $readsth = $dbh->prepare("SELECT id, biblio_auth_number, operation, server FROM zebraqueue WHERE done = 0");
$readsth->execute();
Unix::Syslog::syslog LOG_INFO, "Executing zebra operations\n";
- while (my $data = $readsth->fetchrow_hashref()) {
+
+ ZEBRAQUEUE: while (my $data = $readsth->fetchrow_hashref()) {
warn "Inside while loop" if $debug;
- eval {
- my $ok = 0;
- my $record;
- if ($data->{'operation'} =~ /delete/i ) {
- eval {
-
- warn "Searching for record to delete" if $debug;
- # 1st read the record in zebra, we have to get it from zebra as its no longer in the db
- my $Zconn = C4::Context->Zconn($data->{'server'}, 0, 1, '', 'xml');
- my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$data->{'biblio_auth_number'});
- $results->option(elementSetName => 'marcxml');
- $record = $results->record(0)->raw();
- };
- if ($@) {
- # this doesn't exist, so no need to wail on zebra to delete it
- if ($@->code() eq 13) {
- $ok = 1;
- } else {
- # caught a ZOOM::Exception
- my $error =
- $@->message() . " ("
- . $@->code() . ") "
- . $@->addinfo() . " "
- . $@->diagset();
- warn "ERROR: $error";
- }
- } else {
- # then, delete the record
- warn "Deleting record" if $debug;
- $ok = zebrado($record, $data->{'operation'}, $data->{'server'}, $data->{'biblio_auth_number'});
- }
- }
- else {
- # it is an update
- warn "Updating record" if $debug;
- # get the XML
- my $marcxml;
- if ($data->{'server'} eq "biblioserver") {
- my $marc = GetMarcBiblio($data->{'biblio_auth_number'});
- $marcxml = $marc->as_xml_record() if $marc;
- }
- elsif ($data->{'server'} eq "authorityserver") {
- $marcxml = C4::AuthoritiesMarc::GetAuthorityXML($data->{'biblio_auth_number'});
- }
- # check it's XML, just in case
- eval {
- my $hashed = XMLin($marcxml);
- }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe
- ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
- if ($@) {
- Unix::Syslog::syslog LOG_ERR, "$@";
- my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
- $delsth->execute($data->{'id'});
- next;
- }
- # ok, we have everything, do the operation in zebra !
- $ok = zebrado($marcxml, $data->{'operation'}, $data->{'server'}, $data->{'biblio_auth_number'});
- }
- if ($ok == 1) {
- $dbh = C4::Context->dbh;
- my $delsth;
- # if it's a deletion, we can delete every request on this biblio : in case the user
- # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
- # that are pending and can't succeed, as we don't have the XML anymore
- # so, delete everything for this biblionumber
- if ($data->{'operation'} =~ /delete/i) {
- $delsth = $dbh->prepare("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number = ?");
- $delsth->execute($data->{'biblio_auth_number'});
- # if it's not a deletion, delete every pending specialUpdate for this biblionumber
- # in case the user add biblio, then X items, before this script runs
- # this avoid indexing X+1 times where just 1 is enough.
- } else {
- $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1
- WHERE biblio_auth_number = ? and operation = 'specialUpdate'");
- $delsth->execute($data->{'biblio_auth_number'});
- }
- }
- };
- if ($@) {
- Unix::Syslog::syslog LOG_ERR, "$@";
+
+ my $id = $data->{'id'};
+ my $op = $data->{'operation'};
+ $op = 'recordDelete' if $op =~ /delete/i; # delete ops historically have been coded
+ # either delete_record or recordDelete
+ my $record_number = $data->{'biblio_auth_number'};
+ my $server = $data->{'server'};
+
+ next ZEBRAQUEUE if exists $postponed_updates->{$server}->{$record_number};
+
+ my $ok = 0;
+ my $record;
+ if ($op eq 'recordDelete') {
+ $ok = process_delete($dbh, $server, $record_number);
+ }
+ else {
+ $ok = process_update($dbh, $server, $record_number, $id);
}
+ if ($ok == 1) {
+ mark_done($dbh, $record_number, $op);
+ }
}
$readsth->finish();
$kernel->yield('status_check');
}
+sub process_delete {
+ my $dbh = shift;
+ my $server = shift;
+ my $record_number = shift;
+
+ my $record;
+ my $ok = 0;
+ eval {
+ warn "Searching for record to delete" if $debug;
+ # 1st read the record in zebra, we have to get it from zebra as its no longer in the db
+ my $Zconn = get_zebra_connection($server);
+ my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$record_number);
+ $results->option(elementSetName => 'marcxml');
+ $record = $results->record(0)->raw();
+ };
+ if ($@) {
+ # this doesn't exist, so no need to wail on zebra to delete it
+ if ($@->code() eq 13) {
+ $ok = 1;
+ } else {
+ # caught a ZOOM::Exception
+ my $error = _format_zoom_error_message($@);
+ warn "ERROR: $error";
+ }
+ } else {
+ # then, delete the record
+ warn "Deleting record" if $debug;
+ $ok = zebrado($record, 'recordDelete', $server, $record_number);
+ }
+ return $ok;
+}
+
+sub process_update {
+ my $dbh = shift;
+ my $server = shift;
+ my $record_number = shift;
+ my $id = shift;
+
+ my $record;
+ my $ok = 0;
+
+ warn "Updating record" if $debug;
+ # get the XML
+ my $marcxml;
+ if ($server eq "biblioserver") {
+ my $marc = GetMarcBiblio($record_number);
+ $marcxml = $marc->as_xml_record() if $marc;
+ }
+ elsif ($server eq "authorityserver") {
+ $marcxml = C4::AuthoritiesMarc::GetAuthorityXML($record_number);
+ }
+ # check it's XML, just in case
+ eval {
+ my $hashed = XMLin($marcxml);
+ }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe
+ ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
+ if ($@) {
+ Unix::Syslog::syslog LOG_ERR, "$server record $record_number is malformed: $@";
+ mark_done_by_id($dbh, $id);
+ $ok = 0;
+ } else {
+ # ok, we have everything, do the operation in zebra !
+ $ok = zebrado($marcxml, 'specialUpdate', $server, $record_number);
+ }
+ return $ok;
+}
+
+sub mark_done_by_id {
+ my $dbh = shift;
+ my $id = shift;
+ my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
+ $delsth->execute($id);
+}
+
+sub mark_done {
+ my $dbh = shift;
+ my $record_number = shift;
+ my $op = shift;
+
+ my $delsth;
+ if ($op eq 'recordDelete') {
+ # if it's a deletion, we can delete every request on this biblio : in case the user
+ # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
+ # that are pending and can't succeed, as we don't have the XML anymore
+ # so, delete everything for this biblionumber
+ $delsth = $dbh->prepare_cached("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number = ? and operation = ?");
+ $delsth->execute($record_number, $op);
+ } else {
+ # if it's not a deletion, delete every pending specialUpdate for this biblionumber
+ # in case the user add biblio, then X items, before this script runs
+ # this avoid indexing X+1 times where just 1 is enough.
+ $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1
+ WHERE biblio_auth_number = ? and operation = 'specialUpdate'");
+ $delsth->execute($record_number);
+ }
+}
+
sub zebrado {
###Accepts a $server variable thus we can use it to update biblios, authorities or other zebra dbs
- my ($record, $op, $server, $biblionumber) = @_;
-
- warn "In zebrado" if $debug;
- my @port;
-
- my $tried = 0;
- my $recon = 0;
- my $reconnect = 0;
-# $record=Encode::encode("UTF-8",$record);
- my $shadow = $server."shadow";
- $op = 'recordDelete' if $op eq 'delete_record';
-
-reconnect:
- warn "At reconnect" if $debug;
- my $Zconn = C4::Context->Zconn($server, 0, 1, '', 'xml');
- if ($record) {
- warn "Record found" if $debug;
+ my ($record, $op, $server, $record_number) = @_;
+
+ unless ($record) {
+ my $message = "error updating index for $server $record $record_number: no source record";
+ postpone_update($server, $record_number, $message);
+ return 0;
+ }
+
+ my $attempts = 0;
+ my $ok = 0;
+ ATTEMPT: while ($attempts < $max_operation_attempts) {
+ $attempts++;
+ warn "Attempt $attempts for $op for $server $record_number" if $debug;
+ my $Zconn = get_zebra_connection($server);
+
my $Zpackage = $Zconn->package();
$Zpackage->option(action => $op);
$Zpackage->option(record => $record);
-# $Zpackage->option(recordIdOpaque => $biblionumber) if $biblionumber;
-retry:
- warn "At Retry" if $debug;
+
eval { $Zpackage->send("update") };
if ($@ && $@->isa("ZOOM::Exception")) {
- print "Oops! ", $@->message(), "\n";
- return $@->code();
+ my $message = _format_zoom_error_message($@);
+ my $error = $@->code();
+ if (exists $retriable_zoom_errors{$error}) {
+ warn "reattempting operation $op for $server $record_number" if $debug;
+ warn "last Zebra error was $message" if $debug;
+ $Zpackage->destroy();
+
+ if ($error == 10007 and $zconn_timeout < $max_zconn_timeout) {
+ # bump up connection timeout
+ $zconn_timeout = POSIX::ceil($zconn_timeout * $zconn_timeout_multiplier);
+ $zconn_timeout = $max_zconn_timeout if $zconn_timeout > $max_zconn_timeout;
+ Unix::Syslog::syslog LOG_INFO, "increased Zebra connection timeout to $zconn_timeout\n";
+ warn "increased Zebra connection timeout to $zconn_timeout" if $debug;
+ }
+ next ATTEMPT;
+ } else {
+ postpone_update($server, $record_number, $message);
+ }
}
- my($error, $errmsg, $addinfo, $diagset) = $Zconn->error_x();
- if ($error == 10007 && $tried < 3) {## timeout --another 30 looonng seconds for this update
- sleep 1; ## wait a sec!
- $tried++;
- goto "retry";
- } elsif ($error == 2 && $tried < 2) {## timeout --temporary zebra error !whatever that means
- sleep 2; ## wait two seconds!
- $tried++;
- goto "retry";
- } elsif ($error==10004 && $recon == 0) {##Lost connection -reconnect
- sleep 1; ## wait a sec!
- $recon = 1;
- $Zpackage->destroy();
- $Zconn->destroy();
- goto "reconnect";
- } elsif ($error) {
- $Zpackage->destroy();
- $Zconn->destroy();
- return 0;
+ eval { $Zpackage->send('commit'); };
+ if ($@) {
+ # operation succeeded, but commit
+ # did not - we have a problem
+ my $message = _format_zoom_error_message($@);
+ postpone_update($server, $record_number, $message);
+ } else {
+ $ok = 1;
+ last ATTEMPT;
}
- $Zpackage->send('commit');
- return 1;
}
- return 0;
+
+ unless ($ok) {
+ my $message = "Made $attempts attempts to index $server record $record_number without success";
+ postpone_update($server, $record_number, $message);
+ }
+
+ return $ok;
}
+sub postpone_update {
+ my ($server, $record_number, $message) = @_;
+ warn $message if $debug;
+ $message .= "\n" unless $message =~ /\n$/;
+ Unix::Syslog::syslog LOG_ERR, $message;
+ $postponed_updates->{$server}->{$record_number} = 1;
+}
sub handler_stop {
my $heap = $_[HEAP];
delete $heap->{session};
}
+# get a DB connection
+sub get_db_connection {
+ my $dbh;
+
+ $db_connection_wait = $min_connection_wait unless defined $db_connection_wait;
+ while (1) {
+ eval {
+ # note that C4::Context caches the
+ # DB handle; C4::Context->dbh() will
+ # check that handle first before returning
+ # it. If the connection is bad, it
+ # then tries (once) to create a new one.
+ $dbh = C4::Context->dbh();
+ };
+
+ unless ($@) {
+ # C4::Context->dbh dies if it cannot
+ # establish a connection
+ $db_connection_wait = $min_connection_wait;
+ return $dbh;
+ }
+
+ # connection failed
+ my $error = "failed to connect to DB: $DBI::errstr";
+ warn $error if $debug;
+ Unix::Syslog::syslog LOG_ERR, $error;
+ sleep $db_connection_wait;
+ $db_connection_wait *= 2 unless $db_connection_wait >= $max_connection_wait;
+ }
+}
+
+# get a Zebra connection
+sub get_zebra_connection {
+ my $server = shift;
+
+ # start connection retry wait queue if necessary
+ $zoom_connection_waits{$server} = $min_connection_wait unless exists $zoom_connection_waits{$server};
+
+ # try to connect to Zebra forever until we succeed
+ while (1) {
+ # what follows assumes that C4::Context->Zconn
+ # makes only one attempt to create a new connection;
+ my $Zconn = C4::Context->Zconn($server, 0, 1, '', 'xml');
+ $Zconn->option('timeout' => $zconn_timeout);
+
+ # it is important to note that if the existing connection
+ # stored by C4::Context has an error (any type of error)
+ # from the last transaction, C4::Context->Zconn closes
+ # it and establishes a new one. Therefore, the
+ # following check will succeed if we have a new, good
+ # connection or we're using a previously established
+ # connection that has experienced no errors.
+ if ($Zconn->errcode() == 0) {
+ $zoom_connection_waits{$server} = $min_connection_wait;
+ return $Zconn;
+ }
+
+ # connection failed
+ my $error = _format_zoom_error_message($Zconn);
+ warn $error if $debug;
+ Unix::Syslog::syslog LOG_ERR, $error;
+ sleep $zoom_connection_waits{$server};
+ $zoom_connection_waits{$server} *= 2 unless $zoom_connection_waits{$server} >= $max_connection_wait;
+ }
+}
+
+# given a ZOOM::Exception or
+# ZOOM::Connection object, generate
+# a human-reaable error message
+sub _format_zoom_error_message {
+ my $err = shift;
+
+ my $message = "";
+ if (ref($err) eq 'ZOOM::Connection') {
+ $message = $err->errmsg() . " (" . $err->diagset . " " . $err->errcode() . ") " . $err->addinfo();
+ } elsif (ref($err) eq 'ZOOM::Exception') {
+ $message = $err->message() . " (" . $err->diagset . " " . $err->code() . ") " . $err->addinfo();
+ }
+ return $message;
+}
+
POE::Session->create(
inline_states => {
_start => \&handler_start,