b32f8ba6adf828576743b892b93ccbcfc9ddcc76
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use C4::Context;
21 use Getopt::Long;
22 use Fcntl qw(:flock);
23 use File::Temp qw/ tempdir /;
24 use File::Path;
25 use C4::Biblio;
26 use C4::AuthoritiesMarc;
27 use C4::Items;
28 use Koha::RecordProcessor;
29 use XML::LibXML;
30
31 use constant LOCK_FILENAME => 'rebuild..LCK';
32
33 # script that checks zebradir structure & create directories & mandatory files if needed
34 #
35 #
36
37 $|=1; # flushes output
38 # If the cron job starts us in an unreadable dir, we will break without
39 # this.
40 chdir $ENV{HOME} if (!(-r '.'));
41 my $daemon_mode;
42 my $daemon_sleep = 5;
43 my $directory;
44 my $nosanitize;
45 my $skip_export;
46 my $keep_export;
47 my $skip_index;
48 my $reset;
49 my $biblios;
50 my $authorities;
51 my $as_usmarc;
52 my $as_xml;
53 my $noshadow;
54 my $want_help;
55 my $process_zebraqueue;
56 my $process_zebraqueue_skip_deletes;
57 my $do_not_clear_zebraqueue;
58 my $length;
59 my $where;
60 my $offset;
61 my $run_as_root;
62 my $run_user = (getpwuid($<))[0];
63 my $wait_for_lock = 0;
64 my $use_flock;
65 my $table = 'biblioitems';
66
67 my $verbose_logging = 0;
68 my $zebraidx_log_opt = " -v none,fatal,warn ";
69 my $result = GetOptions(
70     'daemon'        => \$daemon_mode,
71     'sleep:i'       => \$daemon_sleep,
72     'd:s'           => \$directory,
73     'r|reset'       => \$reset,
74     's'             => \$skip_export,
75     'k'             => \$keep_export,
76     'I|skip-index'  => \$skip_index,
77     'nosanitize'    => \$nosanitize,
78     'b'             => \$biblios,
79     'noxml'         => \$as_usmarc,
80     'w'             => \$noshadow,
81     'a'             => \$authorities,
82     'h|help'        => \$want_help,
83     'x'             => \$as_xml,
84     'y'             => \$do_not_clear_zebraqueue,
85     'z'             => \$process_zebraqueue,
86     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
87     'where:s'       => \$where,
88     'length:i'      => \$length,
89     'offset:i'      => \$offset,
90     'v+'            => \$verbose_logging,
91     'run-as-root'   => \$run_as_root,
92     'wait-for-lock' => \$wait_for_lock,
93     't|table:s'     => \$table,
94 );
95
96 if (not $result or $want_help) {
97     print_usage();
98     exit 0;
99 }
100
101 if ( $as_xml ) {
102     warn "Warning: You passed -x which is already the default and is now deprecated\n";
103     undef $as_xml; # Should not be used later
104 }
105
106 if( not defined $run_as_root and $run_user eq 'root') {
107     my $msg = "Warning: You are running this script as the user 'root'.\n";
108     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
109     $msg   .= "Please do '$0 --help' to see usage.\n";
110     die $msg;
111 }
112
113 if ( $as_usmarc and $nosanitize ) {
114     my $msg = "Cannot specify both -noxml and -nosanitize\n";
115     $msg   .= "Please do '$0 --help' to see usage.\n";
116     die $msg;
117 }
118
119 if ($process_zebraqueue and ($skip_export or $reset)) {
120     my $msg = "Cannot specify -r or -s if -z is specified\n";
121     $msg   .= "Please do '$0 --help' to see usage.\n";
122     die $msg;
123 }
124
125 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
126     my $msg = "Cannot specify both -y and -z\n";
127     $msg   .= "Please do '$0 --help' to see usage.\n";
128     die $msg;
129 }
130
131 if ($daemon_mode) {
132     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
133     if ($skip_export or $keep_export or $skip_index or
134           $where or $length or $offset) {
135         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
136         $msg   .= "Please do '$0 --help' to see usage.\n";
137         die $msg;
138     }
139     $authorities = 1;
140     $biblios = 1;
141     $process_zebraqueue = 1;
142 }
143
144 if (not $biblios and not $authorities) {
145     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
146     $msg   .= "Please do '$0 --help' to see usage.\n";
147     die $msg;
148 }
149
150 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio' );
151 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
152     die "Cannot specify -t|--table with value '$table'. Only "
153       . ( join ', ', @tables_allowed_for_select )
154       . " are allowed.";
155 }
156
157
158 #  -v is for verbose, which seems backwards here because of how logging is set
159 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
160 if ($verbose_logging >= 2) {
161     $zebraidx_log_opt = '-v none,fatal,warn,all';
162 }
163
164 my $use_tempdir = 0;
165 unless ($directory) {
166     $use_tempdir = 1;
167     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
168 }
169
170
171 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
172 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
173
174 my $kohadir = C4::Context->config('intranetdir');
175 my $bib_index_mode  = C4::Context->config('zebra_bib_index_mode')  // 'dom';
176 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') // 'dom';
177
178 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
179 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
180
181 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
182 <collection xmlns="http://www.loc.gov/MARC21/slim">
183 };
184
185 my $marcxml_close = q{
186 </collection>
187 };
188
189 # Protect again simultaneous update of the zebra index by using a lock file.
190 # Create our own lock directory if its missing.  This shouild be created
191 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
192 # does not exist and cannot be created, we fall back on /tmp - which will
193 # always work.
194
195 my ($lockfile, $LockFH);
196 foreach (
197     C4::Context->config("zebra_lockdir"),
198     '/var/lock/zebra_' . C4::Context->config('database'),
199     '/tmp/zebra_' . C4::Context->config('database')
200 ) {
201     #we try three possibilities (we really want to lock :)
202     next if !$_;
203     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
204     last if defined $LockFH;
205 }
206 if( !defined $LockFH ) {
207     print "WARNING: Could not create lock file $lockfile: $!\n";
208     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
209     print "Verify file permissions for it too.\n";
210     $use_flock = 0; # we disable file locking now and will continue
211                     # without it
212                     # note that this mimics old behavior (before we used
213                     # the lockfile)
214 };
215
216 if ( $verbose_logging ) {
217     print "Zebra configuration information\n";
218     print "================================\n";
219     print "Zebra biblio directory      = $biblioserverdir\n";
220     print "Zebra authorities directory = $authorityserverdir\n";
221     print "Koha directory              = $kohadir\n";
222     print "Lockfile                    = $lockfile\n" if $lockfile;
223     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
224     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
225     print "================================\n";
226 }
227
228 my $tester = XML::LibXML->new();
229 my $dbh;
230
231 # The main work is done here by calling do_one_pass().  We have added locking
232 # avoid race conditions between full rebuilds and incremental updates either from
233 # daemon mode or periodic invocation from cron.  The race can lead to an updated
234 # record being overwritten by a rebuild if the update is applied after the export
235 # by the rebuild and before the rebuild finishes (more likely to affect large
236 # catalogs).
237 #
238 # We have chosen to exit immediately by default if we cannot obtain the lock
239 # to prevent the potential for a infinite backlog from cron invocations, but an
240 # option (wait-for-lock) is provided to let the program wait for the lock.
241 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
242 if ($daemon_mode) {
243     while (1) {
244         # For incremental updates, skip the update if the updates are locked
245         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
246             eval {
247                 $dbh = C4::Context->dbh;
248                 do_one_pass() if ( zebraqueue_not_empty() );
249             };
250             if ($@ && $verbose_logging) {
251                 warn "Warning : $@\n";
252             }
253             _flock($LockFH, LOCK_UN);
254         }
255         sleep $daemon_sleep;
256     }
257 } else {
258     # all one-off invocations
259     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
260     if (_flock($LockFH, $lock_mode)) {
261         $dbh = C4::Context->dbh;
262         do_one_pass();
263         _flock($LockFH, LOCK_UN);
264     } else {
265         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
266     }
267 }
268
269
270 if ( $verbose_logging ) {
271     print "====================\n";
272     print "CLEANING\n";
273     print "====================\n";
274 }
275 if ($keep_export) {
276     print "NOTHING cleaned : the export $directory has been kept.\n";
277     print "You can re-run this script with the -s ";
278     if ($use_tempdir) {
279         print " and -d $directory parameters";
280     } else {
281         print "parameter";
282     }
283     print "\n";
284     print "if you just want to rebuild zebra after changing the record.abs\n";
285     print "or another zebra config file\n";
286 } else {
287     unless ($use_tempdir) {
288         # if we're using a temporary directory
289         # created by File::Temp, it will be removed
290         # automatically.
291         rmtree($directory, 0, 1);
292         print "directory $directory deleted\n";
293     }
294 }
295
296 sub do_one_pass {
297     if ($authorities) {
298         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
299     } else {
300         print "skipping authorities\n" if ( $verbose_logging );
301     }
302
303     if ($biblios) {
304         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
305     } else {
306         print "skipping biblios\n" if ( $verbose_logging );
307     }
308 }
309
310 # Check the zebra update queue and return true if there are records to process
311 # This routine will handle each of -ab, -a, or -b, but in practice we force
312 # -ab when in daemon mode.
313 sub zebraqueue_not_empty {
314     my $where_str;
315
316     if ($authorities && $biblios) {
317         $where_str = 'done = 0;';
318     } elsif ($biblios) {
319         $where_str = 'server = "biblioserver" AND done = 0;';
320     } else {
321         $where_str = 'server = "authorityserver" AND done = 0;';
322     }
323     my $query =
324         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
325
326     $query->execute;
327     my $count = $query->fetchrow_arrayref->[0];
328     print "queued records: $count\n" if $verbose_logging > 0;
329     return $count > 0;
330 }
331
332 # This checks to see if the zebra directories exist under the provided path.
333 # If they don't, then zebra is likely to spit the dummy. This returns true
334 # if the directories had to be created, false otherwise.
335 sub check_zebra_dirs {
336     my ($base) = shift() . '/';
337     my $needed_repairing = 0;
338     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
339     foreach my $dir (@dirs) {
340         my $bdir = $base . $dir;
341         if (! -d $bdir) {
342             $needed_repairing = 1;
343             mkdir $bdir || die "Unable to create '$bdir': $!\n";
344             print "$0: needed to create '$bdir'\n";
345         }
346     }
347     return $needed_repairing;
348 }   # ----------  end of subroutine check_zebra_dirs  ----------
349
350 sub index_records {
351     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
352
353     my $num_records_exported = 0;
354     my $records_deleted = {};
355     my $need_reset = check_zebra_dirs($server_dir);
356     if ($need_reset) {
357         print "$0: found broken zebra server directories: forcing a rebuild\n";
358         $reset = 1;
359     }
360     if ($skip_export && $verbose_logging) {
361         print "====================\n";
362         print "SKIPPING $record_type export\n";
363         print "====================\n";
364     } else {
365         if ( $verbose_logging ) {
366             print "====================\n";
367             print "exporting $record_type\n";
368             print "====================\n";
369         }
370         mkdir "$directory" unless (-d $directory);
371         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
372         if ($process_zebraqueue) {
373             my $entries;
374
375             unless ( $process_zebraqueue_skip_deletes ) {
376                 $entries = select_zebraqueue_records($record_type, 'deleted');
377                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
378                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_usmarc);
379                 mark_zebraqueue_batch_done($entries);
380             }
381
382             $entries = select_zebraqueue_records($record_type, 'updated');
383             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
384             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $as_usmarc, $records_deleted);
385             mark_zebraqueue_batch_done($entries);
386
387         } else {
388             my $sth = select_all_records($record_type);
389             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_usmarc, $nosanitize);
390             unless ($do_not_clear_zebraqueue) {
391                 mark_all_zebraqueue_done($record_type);
392             }
393         }
394     }
395
396     #
397     # and reindexing everything
398     #
399     if ($skip_index) {
400         if ($verbose_logging) {
401             print "====================\n";
402             print "SKIPPING $record_type indexing\n";
403             print "====================\n";
404         }
405     } else {
406         if ( $verbose_logging ) {
407             print "====================\n";
408             print "REINDEXING zebra\n";
409             print "====================\n";
410         }
411         my $record_fmt = ($as_usmarc) ? 'iso2709' : 'marcxml' ;
412         if ($process_zebraqueue) {
413             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
414                 if %$records_deleted;
415             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
416                 if $num_records_exported;
417         } else {
418             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
419                 if ($num_records_exported or $skip_export);
420         }
421     }
422 }
423
424
425 sub select_zebraqueue_records {
426     my ($record_type, $update_type) = @_;
427
428     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
429     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
430
431     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
432                              FROM zebraqueue
433                              WHERE server = ?
434                              AND   operation = ?
435                              AND   done = 0
436                              ORDER BY id DESC");
437     $sth->execute($server, $op);
438     my $entries = $sth->fetchall_arrayref({});
439 }
440
441 sub mark_all_zebraqueue_done {
442     my ($record_type) = @_;
443
444     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
445
446     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
447                              WHERE server = ?
448                              AND done = 0");
449     $sth->execute($server);
450 }
451
452 sub mark_zebraqueue_batch_done {
453     my ($entries) = @_;
454
455     $dbh->{AutoCommit} = 0;
456     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
457     $dbh->commit();
458     foreach my $id (map { $_->{id} } @$entries) {
459         $sth->execute($id);
460     }
461     $dbh->{AutoCommit} = 1;
462 }
463
464 sub select_all_records {
465     my $record_type = shift;
466     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
467 }
468
469 sub select_all_authorities {
470     my $strsth=qq{SELECT authid FROM auth_header};
471     $strsth.=qq{ WHERE $where } if ($where);
472     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
473     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
474     my $sth = $dbh->prepare($strsth);
475     $sth->execute();
476     return $sth;
477 }
478
479 sub select_all_biblios {
480     $table = 'biblioitems'
481       unless grep { /^$table$/ } @tables_allowed_for_select;
482     my $strsth = qq{ SELECT biblionumber FROM $table };
483     $strsth.=qq{ WHERE $where } if ($where);
484     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
485     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
486     my $sth = $dbh->prepare($strsth);
487     $sth->execute();
488     return $sth;
489 }
490
491 sub export_marc_records_from_sth {
492     my ($record_type, $sth, $directory, $as_usmarc, $nosanitize) = @_;
493
494     my $num_exported = 0;
495     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
496
497     print {$fh} $marcxml_open
498         unless $as_usmarc;
499
500     my $i = 0;
501     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
502     while (my ($record_number) = $sth->fetchrow_array) {
503         print "." if ( $verbose_logging );
504         print "\r$i" unless ($i++ %100 or !$verbose_logging);
505         if ( $nosanitize ) {
506             my $marcxml = $record_type eq 'biblio'
507                           ? GetXmlBiblio( $record_number )
508                           : GetAuthorityXML( $record_number );
509             if ($record_type eq 'biblio'){
510                 my @items = GetItemsInfo($record_number);
511                 if (@items){
512                     my $record = MARC::Record->new;
513                     $record->encoding('UTF-8');
514                     my @itemsrecord;
515                     foreach my $item (@items){
516                         my $record = Item2Marc($item, $record_number);
517                         push @itemsrecord, $record->field($itemtag);
518                     }
519                     $record->insert_fields_ordered(@itemsrecord);
520                     my $itemsxml = $record->as_xml_record();
521                     $marcxml =
522                         substr($marcxml, 0, length($marcxml)-10) .
523                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
524                 }
525             }
526             # extra test to ensure that result is valid XML; otherwise
527             # Zebra won't parse it in DOM mode
528             eval {
529                 my $doc = $tester->parse_string($marcxml);
530             };
531             if ($@) {
532                 warn "Error exporting record $record_number ($record_type): $@\n";
533                 next;
534             }
535             if ( $marcxml ) {
536                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
537                 print {$fh} $marcxml;
538                 $num_exported++;
539             }
540             next;
541         }
542         my ($marc) = get_corrected_marc_record($record_type, $record_number, $as_usmarc);
543         if (defined $marc) {
544             eval {
545                 my $rec;
546                 if ($as_usmarc) {
547                     $rec = $marc->as_usmarc();
548                 } else {
549                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
550                     eval {
551                         my $doc = $tester->parse_string($rec);
552                     };
553                     if ($@) {
554                         die "invalid XML: $@";
555                     }
556                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
557                 }
558                 print {$fh} $rec;
559                 $num_exported++;
560             };
561             if ($@) {
562                 warn "Error exporting record $record_number ($record_type) ".($as_usmarc ? "not XML" : "XML");
563                 warn "... specific error is $@" if $verbose_logging;
564             }
565         }
566     }
567     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
568     print {$fh} $marcxml_close
569         unless $as_usmarc;
570
571     close $fh;
572     return $num_exported;
573 }
574
575 sub export_marc_records_from_list {
576     my ($record_type, $entries, $directory, $as_usmarc, $records_deleted) = @_;
577
578     my $num_exported = 0;
579     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
580
581     print {$fh} $marcxml_open
582         unless $as_usmarc;
583
584     my $i = 0;
585
586     # Skip any deleted records. We check for this anyway, but this reduces error spam
587     my %found = %$records_deleted;
588     foreach my $record_number ( map { $_->{biblio_auth_number} }
589                                 grep { !$found{ $_->{biblio_auth_number} }++ }
590                                 @$entries ) {
591         print "." if ( $verbose_logging );
592         print "\r$i" unless ($i++ %100 or !$verbose_logging);
593         my ($marc) = get_corrected_marc_record($record_type, $record_number, $as_usmarc);
594         if (defined $marc) {
595             eval {
596                 my $rec;
597                 if ( $as_usmarc ) {
598                     $rec = $marc->as_usmarc();
599                 } else {
600                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
601                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
602                 }
603                 print {$fh} $rec;
604                 $num_exported++;
605             };
606             if ($@) {
607               warn "Error exporting record $record_number ($record_type) ".($as_usmarc ? "not XML" : "XML");
608             }
609         }
610     }
611     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
612
613     print {$fh} $marcxml_close
614         unless $as_usmarc;
615
616     close $fh;
617     return $num_exported;
618 }
619
620 sub generate_deleted_marc_records {
621
622     my ($record_type, $entries, $directory, $as_usmarc) = @_;
623
624     my $records_deleted = {};
625     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
626
627     print {$fh} $marcxml_open
628         unless $as_usmarc;
629
630     my $i = 0;
631     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
632         print "\r$i" unless ($i++ %100 or !$verbose_logging);
633         print "." if ( $verbose_logging );
634
635         my $marc = MARC::Record->new();
636         if ($record_type eq 'biblio') {
637             fix_biblio_ids($marc, $record_number, $record_number);
638         } else {
639             fix_authority_id($marc, $record_number);
640         }
641         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
642             fix_unimarc_100($marc);
643         }
644
645         my $rec;
646         if ( $as_usmarc ) {
647             $rec = $marc->as_usmarc();
648         } else {
649             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
650             # Remove the record's XML header
651             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
652         }
653         print {$fh} $rec;
654
655         $records_deleted->{$record_number} = 1;
656     }
657     print "\nRecords exported: $i\n" if ( $verbose_logging );
658
659     print {$fh} $marcxml_close
660         unless $as_usmarc;
661
662     close $fh;
663     return $records_deleted;
664 }
665
666 sub get_corrected_marc_record {
667     my ($record_type, $record_number, $as_usmarc) = @_;
668
669     my $marc = get_raw_marc_record($record_type, $record_number, $as_usmarc);
670
671     if (defined $marc) {
672         fix_leader($marc);
673         if ($record_type eq 'authority') {
674             fix_authority_id($marc, $record_number);
675         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
676             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
677             $marc = $normalizer->process($marc);
678         }
679         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
680             fix_unimarc_100($marc);
681         }
682     }
683
684     return $marc;
685 }
686
687 sub get_raw_marc_record {
688     my ($record_type, $record_number, $as_usmarc) = @_;
689
690     my $marc;
691     if ($record_type eq 'biblio') {
692         if ($as_usmarc) {
693             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
694             $fetch_sth->execute($record_number);
695             if (my ($blob) = $fetch_sth->fetchrow_array) {
696                 $marc = MARC::Record->new_from_usmarc($blob);
697                 unless ($marc) {
698                     warn "error creating MARC::Record from $blob";
699                 }
700             }
701             # failure to find a bib is not a problem -
702             # a delete could have been done before
703             # trying to process a record update
704
705             $fetch_sth->finish();
706             return unless $marc;
707         } else {
708             eval { $marc = GetMarcBiblio($record_number, 1); };
709             if ($@ || !$marc) {
710                 # here we do warn since catching an exception
711                 # means that the bib was found but failed
712                 # to be parsed
713                 warn "error retrieving biblio $record_number";
714                 return;
715             }
716         }
717     } else {
718         eval { $marc = GetAuthority($record_number); };
719         if ($@) {
720             warn "error retrieving authority $record_number";
721             return;
722         }
723     }
724     return $marc;
725 }
726
727 sub fix_leader {
728     # FIXME - this routine is suspect
729     # It blanks the Leader/00-05 and Leader/12-16 to
730     # force them to be recalculated correct when
731     # the $marc->as_usmarc() or $marc->as_xml() is called.
732     # But why is this necessary?  It would be a serious bug
733     # in MARC::Record (definitely) and MARC::File::XML (arguably)
734     # if they are emitting incorrect leader values.
735     my $marc = shift;
736
737     my $leader = $marc->leader;
738     substr($leader,  0, 5) = '     ';
739     substr($leader, 10, 7) = '22     ';
740     $marc->leader(substr($leader, 0, 24));
741 }
742
743 sub fix_biblio_ids {
744     # FIXME - it is essential to ensure that the biblionumber is present,
745     #         otherwise, Zebra will choke on the record.  However, this
746     #         logic belongs in the relevant C4::Biblio APIs.
747     my $marc = shift;
748     my $biblionumber = shift;
749     my $biblioitemnumber;
750     if (@_) {
751         $biblioitemnumber = shift;
752     } else {
753         my $sth = $dbh->prepare(
754             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
755         $sth->execute($biblionumber);
756         ($biblioitemnumber) = $sth->fetchrow_array;
757         $sth->finish;
758         unless ($biblioitemnumber) {
759             warn "failed to get biblioitemnumber for biblio $biblionumber";
760             return 0;
761         }
762     }
763
764     # FIXME - this is cheating on two levels
765     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
766     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
767     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
768     #
769     # On the other hand, this better for now than what rebuild_zebra.pl used to
770     # do, which was duplicate the code for inserting the biblionumber
771     # and biblioitemnumber
772     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
773
774     return 1;
775 }
776
777 sub fix_authority_id {
778     # FIXME - as with fix_biblio_ids, the authid must be present
779     #         for Zebra's sake.  However, this really belongs
780     #         in C4::AuthoritiesMarc.
781     my ($marc, $authid) = @_;
782     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
783         $marc->delete_field($marc->field('001'));
784         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
785     }
786 }
787
788 sub fix_unimarc_100 {
789     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
790     my $marc = shift;
791
792     my $string;
793     my $length_100a = length($marc->subfield( 100, "a" ));
794     if (  $length_100a and $length_100a == 36 ) {
795         $string = $marc->subfield( 100, "a" );
796         my $f100 = $marc->field(100);
797         $marc->delete_field($f100);
798     }
799     else {
800         $string = POSIX::strftime( "%Y%m%d", localtime );
801         $string =~ s/\-//g;
802         $string = sprintf( "%-*s", 35, $string );
803     }
804     substr( $string, 22, 6, "frey50" );
805     $length_100a = length($marc->subfield( 100, "a" ));
806     unless ( $length_100a and $length_100a == 36 ) {
807         $marc->delete_field($marc->field(100));
808         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
809     }
810 }
811
812 sub do_indexing {
813     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
814
815     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
816     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
817     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
818     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
819
820     $noshadow //= '';
821
822     if ($noshadow or $reset_index) {
823         $noshadow = '-n';
824     }
825
826     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
827     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
828     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
829 }
830
831 sub _flock {
832     # test if flock is present; if so, use it; if not, return true
833     # op refers to the official flock operations including LOCK_EX,
834     # LOCK_UN, etc.
835     # combining LOCK_EX with LOCK_NB returns immediately
836     my ($fh, $op)= @_;
837     if( !defined($use_flock) ) {
838         #check if flock is present; if not, you will have a fatal error
839         my $lock_acquired = eval { flock($fh, $op) };
840         # assuming that $fh and $op are fine(..), an undef $lock_acquired
841         # means no flock
842         $use_flock = defined($lock_acquired) ? 1 : 0;
843         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
844         return 1 if !$use_flock;
845         return $lock_acquired;
846     } else {
847         return 1 if !$use_flock;
848         return flock($fh, $op);
849     }
850 }
851
852 sub _create_lockfile { #returns undef on failure
853     my $dir= shift;
854     unless (-d $dir) {
855         eval { mkpath($dir, 0, oct(755)) };
856         return if $@;
857     }
858     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
859     return ( $fh, $dir.'/'.LOCK_FILENAME );
860 }
861
862 sub print_usage {
863     print <<_USAGE_;
864 $0: reindex MARC bibs and/or authorities in Zebra.
865
866 Use this batch job to reindex all biblio or authority
867 records in your Koha database.
868
869 Parameters:
870
871     -b                      index bibliographic records
872
873     -a                      index authority records
874
875     -daemon                 Run in daemon mode.  The program will loop checking
876                             for entries on the zebraqueue table, processing
877                             them incrementally if present, and then sleep
878                             for a few seconds before repeating the process
879                             Checking the zebraqueue table is done with a cheap
880                             SQL query.  This allows for near realtime update of
881                             the zebra search index with low system overhead.
882                             Use -sleep to control the checking interval.
883
884                             Daemon mode implies -z, -a, -b.  The program will
885                             refuse to start if options are present that do not
886                             make sense while running as an incremental update
887                             daemon (e.g. -r or -offset).
888
889     -sleep 10               Seconds to sleep between checks of the zebraqueue
890                             table in daemon mode.  The default is 5 seconds.
891
892     -z                      select only updated and deleted
893                             records marked in the zebraqueue
894                             table.  Cannot be used with -r
895                             or -s.
896
897     --skip-deletes          only select record updates, not record
898                             deletions, to avoid potential excessive
899                             I/O when zebraidx processes deletions.
900                             If this option is used for normal indexing,
901                             a cronjob should be set up to run
902                             rebuild_zebra.pl -z without --skip-deletes
903                             during off hours.
904                             Only effective with -z.
905
906     -r                      clear Zebra index before
907                             adding records to index. Implies -w.
908
909     -d                      Temporary directory for indexing.
910                             If not specified, one is automatically
911                             created.  The export directory
912                             is automatically deleted unless
913                             you supply the -k switch.
914
915     -k                      Do not delete export directory.
916
917     -s                      Skip export.  Used if you have
918                             already exported the records
919                             in a previous run.
920
921     -noxml                  index from ISO MARC blob
922                             instead of MARC XML.  This
923                             option is recommended only
924                             for advanced user.
925
926     -nosanitize             export biblio/authority records directly from DB marcxml
927                             field without sanitizing records. It speed up
928                             dump process but could fail if DB contains badly
929                             encoded records. Works only with -x,
930
931     -w                      skip shadow indexing for this batch
932
933     -y                      do NOT clear zebraqueue after indexing; normally,
934                             after doing batch indexing, zebraqueue should be
935                             marked done for the affected record type(s) so that
936                             a running zebraqueue_daemon doesn't try to reindex
937                             the same records - specify -y to override this.
938                             Cannot be used with -z.
939
940     -v                      increase the amount of logging.  Normally only
941                             warnings and errors from the indexing are shown.
942                             Use log level 2 (-v -v) to include all Zebra logs.
943
944     --length   1234         how many biblio you want to export
945     --offset 1243           offset you want to start to
946                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
947                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
948     --where                 let you specify a WHERE query, like itemtype='BOOK'
949                             or something like that
950
951     --run-as-root           explicitily allow script to run as 'root' user
952
953     --wait-for-lock         when not running in daemon mode, the default
954                             behavior is to abort a rebuild if the rebuild
955                             lock is busy.  This option will cause the program
956                             to wait for the lock to free and then continue
957                             processing the rebuild request,
958
959     --table                 specify a table (can be items, biblioitems or biblio) to retrieve biblionumber to index.
960                             biblioitems is the default value.
961
962     --help or -h            show this message.
963 _USAGE_
964 }