r1020@llin: dpavlin | 2006-09-26 14:40:34 +0200
[webpac2] / run.pl
diff --git a/run.pl b/run.pl
index 3ac7fb2..3b01e3f 100755 (executable)
--- a/run.pl
+++ b/run.pl
@@ -7,20 +7,23 @@ use File::Temp qw/tempdir/;
 use lib './lib';
 
 use WebPAC::Common 0.02;
-use WebPAC::Lookup;
-use WebPAC::Input 0.03;
-use WebPAC::Store 0.03;
-use WebPAC::Normalize;
+use WebPAC::Parser 0.04;
+use WebPAC::Input 0.13;
+use WebPAC::Store 0.11;
+use WebPAC::Normalize 0.11;
 use WebPAC::Output::TT;
-use WebPAC::Validate;
-use YAML qw/LoadFile/;
+use WebPAC::Validate 0.06;
+use WebPAC::Output::MARC;
+use WebPAC::Config;
 use Getopt::Long;
 use File::Path;
 use Time::HiRes qw/time/;
 use File::Slurp;
-use MARC::Record 2.0;  # need 2.0 for utf-8 encoding see marcpm.sf.net
-use MARC::Lint;
 use Data::Dump qw/dump/;
+use Storable qw/dclone/;
+
+use Proc::Queue size => 1;
+use POSIX ":sys_wait_h"; # imports WNOHANG
 
 =head1 NAME
 
@@ -57,8 +60,8 @@ path to YAML configuration file
 
 =item --stats
 
-disable indexing and dump statistics about field and subfield
-usage for each input
+disable indexing, modify_* in configuration and dump statistics about field
+and subfield usage for each input
 
 =item --validate path/to/validation_file
 
@@ -77,6 +80,23 @@ Optional path to output file
 By default turned on if C<--marc-normalize> is used. You can disable lint
 messages with C<--no-marc-lint>.
 
+=item --marc-dump
+
+Force dump or input and marc record for debugging.
+
+=item --parallel 4
+
+Run databases in parallel (aproximatly same as number of processors in
+machine if you want to use full load)
+
+=item --only-links
+
+Create just links
+
+=item --merge
+
+Create merged index of databases which have links
+
 =back
 
 =cut
@@ -85,13 +105,19 @@ my $offset;
 my $limit;
 
 my $clean = 0;
-my $config = 'conf/config.yml';
+my $config_path;
 my $debug = 0;
 my $only_filter;
 my $stats = 0;
 my $validate_path;
 my ($marc_normalize, $marc_output);
 my $marc_lint = 1;
+my $marc_dump = 0;
+my $parallel = 0;
+my $only_links = 0;
+my $merge = 0;
+
+my $log = _new WebPAC::Common()->_get_logger();
 
 GetOptions(
        "limit=i" => \$limit,
@@ -99,30 +125,45 @@ GetOptions(
        "clean" => \$clean,
        "one=s" => \$only_filter,
        "only=s" => \$only_filter,
-       "config" => \$config,
-       "debug" => \$debug,
+       "config" => \$config_path,
+       "debug+" => \$debug,
        "stats" => \$stats,
        "validate=s" => \$validate_path,
        "marc-normalize=s" => \$marc_normalize,
        "marc-output=s" => \$marc_output,
        "marc-lint!" => \$marc_lint,
+       "marc-dump!" => \$marc_dump,
+       "parallel=i" => \$parallel,
+       "only-links!" => \$only_links,
+       "merge" => \$merge,
 );
 
-$config = LoadFile($config);
+my $config = new WebPAC::Config( path => $config_path );
 
-print "config = ",dump($config) if ($debug);
+#print "config = ",dump($config) if ($debug);
 
-die "no databases in config file!\n" unless ($config->{databases});
+die "no databases in config file!\n" unless ($config->databases);
 
-my $log = _new WebPAC::Common()->_get_logger();
 $log->info( "-" x 79 );
 
+
+my $estcmd_fh;
+my $estcmd_path = './estcmd-merge.sh';
+if ($merge) {
+       open($estcmd_fh, '>', $estcmd_path) || $log->logdie("can't open $estcmd_path: $!");
+       print $estcmd_fh 'cd /data/estraier/_node/ || exit 1',$/;
+       print $estcmd_fh 'sudo /etc/init.d/hyperestraier stop',$/;
+       $log->info("created merge batch file $estcmd_path");
+}
+
+
 my $validate;
 $validate = new WebPAC::Validate(
        path => $validate_path,
 ) if ($validate_path);
 
-my $use_indexer = $config->{use_indexer} || 'hyperestraier';
+
+my $use_indexer = $config->use_indexer;
 if ($stats) {
        $log->debug("option --stats disables update of indexing engine...");
        $use_indexer = undef;
@@ -133,31 +174,69 @@ if ($stats) {
 # disable indexing when creating marc
 $use_indexer = undef if ($marc_normalize);
 
+# parse normalize files and create source files for lookup and normalization
+
+my $parser = new WebPAC::Parser( config => $config );
+
 my $total_rows = 0;
 my $start_t = time();
 
 my @links;
-my $indexer;
 
-my $lint = new MARC::Lint if ($marc_lint);
+if ($parallel) {
+       $log->info("Using $parallel processes for speedup");
+       Proc::Queue::size($parallel);
+}
+
+sub create_ds_config {
+       my ($db_config, $database, $input, $mfn) = @_;
+       my $c = dclone( $db_config );
+       $c->{_} = $database || $log->logconfess("need database");
+       $c->{_mfn} = $mfn || $log->logconfess("need mfn");
+       $c->{input} = $input || $log->logconfess("need input");
+       return $c;
+}
 
-while (my ($database, $db_config) = each %{ $config->{databases} }) {
+while (my ($database, $db_config) = each %{ $config->databases }) {
 
        my ($only_database,$only_input) = split(m#/#, $only_filter) if ($only_filter);
        next if ($only_database && $database !~ m/$only_database/i);
 
+       if ($parallel) {
+               my $f=fork;
+               if(defined ($f) and $f==0) {
+                       $log->info("Created processes $$ for speedup");
+               } else {
+                       next;
+               }
+       }
+
+       my $indexer;
        if ($use_indexer) {
-               my $indexer_config = $config->{$use_indexer} || $log->logdie("can't find '$use_indexer' part in confguration");
+
+               my $cfg_name = $use_indexer;
+               $cfg_name =~ s/\-.*$//;
+
+               my $indexer_config = $config->get( $cfg_name ) || $log->logdie("can't find '$cfg_name' part in confguration");
                $indexer_config->{database} = $database;
                $indexer_config->{clean} = $clean;
                $indexer_config->{label} = $db_config->{name};
 
+               # force clean if database has links
+               $indexer_config->{clean} = 1 if ($db_config->{links});
+
                if ($use_indexer eq 'hyperestraier') {
 
                        # open Hyper Estraier database
                        use WebPAC::Output::Estraier '0.10';
                        $indexer = new WebPAC::Output::Estraier( %{ $indexer_config } );
                
+               } elsif ($use_indexer eq 'hyperestraier-native') {
+
+                       # open Hyper Estraier database
+                       use WebPAC::Output::EstraierNative;
+                       $indexer = new WebPAC::Output::EstraierNative( %{ $indexer_config } );
+
                } elsif ($use_indexer eq 'kinosearch') {
 
                        # open KinoSearch
@@ -173,13 +252,40 @@ while (my ($database, $db_config) = each %{ $config->{databases} }) {
        }
 
 
+       #
+       # store Hyper Estraier links to other databases
+       #
+       if (ref($db_config->{links}) eq 'ARRAY' && $use_indexer) {
+               foreach my $link (@{ $db_config->{links} }) {
+                       if ($use_indexer eq 'hyperestraier') {
+                               if ($merge) {
+                                       print $estcmd_fh 'sudo -u www-data estcmd merge ' . $database . ' ' . $link->{to},$/;
+                               } else {
+                                       $log->info("saving link $database -> $link->{to} [$link->{credit}]");
+                                       push @links, sub {
+                                               $log->info("adding link $database -> $link->{to} [$link->{credit}]");
+                                               $indexer->add_link(
+                                                       from => $database,
+                                                       to => $link->{to},
+                                                       credit => $link->{credit},
+                                               );
+                                       };
+                               }
+                       } else {
+                               $log->warn("NOT IMPLEMENTED WITH $use_indexer: adding link $database -> $link->{to} [$link->{credit}]");
+                       }
+               }
+       }
+       next if ($only_links);
+
+
        #
        # now WebPAC::Store
        #
        my $abs_path = abs_path($0);
        $abs_path =~ s#/[^/]*$#/#;
 
-       my $db_path = $config->{webpac}->{db_path} . '/' . $database;
+       my $db_path = $config->webpac('db_path');
 
        if ($clean) {
                $log->info("creating new database '$database' in $db_path");
@@ -188,9 +294,8 @@ while (my ($database, $db_config) = each %{ $config->{databases} }) {
                $log->info("working on database '$database' in $db_path");
        }
 
-       my $db = new WebPAC::Store(
+       my $store = new WebPAC::Store(
                path => $db_path,
-               database => $database,
                debug => $debug,
        );
 
@@ -208,50 +313,113 @@ while (my ($database, $db_config) = each %{ $config->{databases} }) {
                $log->info("database $database doesn't have inputs defined");
        }
 
-       my @supported_inputs = keys %{ $config->{webpac}->{inputs} };
-
        foreach my $input (@inputs) {
 
-               next if ($only_input && ($input->{name} !~ m#$only_input#i && $input->{type} !~ m#$only_input#i));
+               my $input_name = $input->{name} || $log->logdie("input without a name isn't valid: ",dump($input));
+
+               next if ($only_input && ($input_name !~ m#$only_input#i && $input->{type} !~ m#$only_input#i));
 
                my $type = lc($input->{type});
 
-               die "I know only how to handle input types ", join(",", @supported_inputs), " not '$type'!\n" unless (grep(/$type/, @supported_inputs));
+               die "I know only how to handle input types ", join(",", $config->webpac('inputs') ), " not '$type'!\n" unless (grep(/$type/, $config->webpac('inputs')));
 
-               my $lookup = new WebPAC::Lookup(
-                       lookup_file => $input->{lookup},
-               ) if ($input->{lookup});
+               my $input_module = $config->webpac('inputs')->{$type};
 
-               my $input_module = $config->{webpac}->{inputs}->{$type};
+               my @lookups = $parser->have_lookup_create($database, $input);
 
-               $log->info("working on input '$input->{name}' in $input->{path} [type: $input->{type}] using $input_module",
-                       $input->{lookup} ? "lookup '$input->{lookup}'" : ""
+               $log->info("working on input '$input_name' in $input->{path} [type: $input->{type}] using $input_module",
+                       @lookups ? " creating lookups: ".join(", ", @lookups) : ""
                );
 
+               if ($stats) {
+                       # disable modification of records if --stats is in use
+                       delete($input->{modify_records});
+                       delete($input->{modify_file});
+               }
+
                my $input_db = new WebPAC::Input(
                        module => $input_module,
-                       code_page => $config->{webpac}->{webpac_encoding},
+                       encoding => $config->webpac('webpac_encoding'),
                        limit => $limit || $input->{limit},
                        offset => $offset,
-                       lookup => $lookup,
                        recode => $input->{recode},
                        stats => $stats,
+                       modify_records => $input->{modify_records},
+                       modify_file => $input->{modify_file},
                );
                $log->logdie("can't create input using $input_module") unless ($input);
 
+               if (defined( $input->{lookup} )) {
+                       $log->warn("$database/$input_name has depriciated lookup definition, removing it...");
+                       delete( $input->{lookup} );
+               }
+
+               my $lookup;
+               my $lookup_coderef;
+
+               if (@lookups) {
+
+                       my $rules = $parser->lookup_create_rules($database, $input) || $log->logdie("no rules found for $database/$input");
+
+                       $lookup_coderef = sub {
+                               my $rec = shift || die "need rec!";
+                               my $mfn = $rec->{'000'}->[0] || die "need mfn in 000";
+
+                               WebPAC::Normalize::data_structure(
+                                       row => $rec,
+                                       rules => $rules,
+                                       lookup => $lookup,
+                                       config => create_ds_config( $db_config, $database, $input, $mfn ),
+                               );
+
+                               warn "current lookup = ", dump($lookup) if ($lookup);
+                       };
+
+                       WebPAC::Normalize::_set_lookup( undef );
+
+                       $log->debug("created lookup_coderef using:\n$rules");
+
+               };
+
                my $maxmfn = $input_db->open(
                        path => $input->{path},
                        code_page => $input->{encoding},        # database encoding
+                       lookup_coderef => $lookup_coderef,
                        %{ $input },
                );
 
+               my $lookup_data = WebPAC::Normalize::_get_lookup();
+
+               if (defined( $lookup_data->{$database}->{$input_name} )) {
+                       $log->debug("created following lookups: ", dump( $lookup_data ));
+
+                       foreach my $key (keys %{ $lookup_data->{$database}->{$input_name} }) {
+                               $store->save_lookup(
+                                       database => $database,
+                                       input => $input_name,
+                                       key => $key,
+                                       data => $lookup_data->{$database}->{$input_name}->{$key},
+                               );
+                       }
+               }
+
+               my $report_fh;
+               if ($stats || $validate) {
+                       my $path = "out/report/${database}-${input_name}.txt";
+                       open($report_fh, '>', $path) || $log->logdie("can't open $path: $!");
+
+                       print $report_fh "Report for database '$database' input '$input_name' records ",
+                               $offset || 1, "-", $limit || $input->{limit} || $maxmfn, "\n\n";
+                       $log->info("Generating report file $path");
+               }
+
                my @norm_array = ref($input->{normalize}) eq 'ARRAY' ?
                        @{ $input->{normalize} } : ( $input->{normalize} );
 
                if ($marc_normalize) {
                        @norm_array = ( {
                                path => $marc_normalize,
-                               output => $marc_output || 'out/marc/' . $database . '-' . $input->{name} . '.marc',
+                               output => $marc_output || "out/marc/${database}-${input_name}.marc",
                        } );
                }
 
@@ -265,17 +433,19 @@ while (my ($database, $db_config) = each %{ $config->{databases} }) {
 
                        $log->info("Using $normalize_path for normalization...");
 
-                       my $marc_fh;
-                       if (my $path = $normalize->{output}) {
-                               open($marc_fh, '>', $path) ||
-                                       $log->logdie("can't open MARC output $path: $!");
-
-                               $log->info("Creating MARC export file $path", $marc_lint ? ' (with lint)' : '', "\n");
-                       }
+                       my $marc = new WebPAC::Output::MARC(
+                               path => $normalize->{output},
+                               lint => $marc_lint,
+                               dump => $marc_dump,
+                       ) if ($normalize->{output});
 
                        # reset position in database
                        $input_db->seek(1);
 
+                       # generate name of config key for indexer (strip everything after -)
+                       my $indexer_config = $use_indexer;
+                       $indexer_config =~ s/^(\w+)-?.*$/$1/g if ($indexer_config);
+
                        foreach my $pos ( 0 ... $input_db->size ) {
 
                                my $row = $input_db->fetch || next;
@@ -290,59 +460,72 @@ while (my ($database, $db_config) = each %{ $config->{databases} }) {
 
 
                                if ($validate) {
-                                       my @errors = $validate->validate_errors( $row );
-                                       $log->error( "MFN $mfn validation errors:\n", join("\n", @errors) ) if (@errors);
+                                       if ( my $errors = $validate->validate_errors( $row, $input_db->dump ) ) {
+                                               $log->error( "MFN $mfn validation error:\n",
+                                                       $validate->report_error( $errors )
+                                               );
+                                       }
                                }
 
-                                       
                                my $ds = WebPAC::Normalize::data_structure(
                                        row => $row,
                                        rules => $rules,
                                        lookup => $lookup ? $lookup->lookup_hash : undef,
+                                       config => create_ds_config( $db_config, $database, $input, $mfn ),
                                        marc_encoding => 'utf-8',
                                );
 
-                               $db->save_ds(
+                               $store->save_ds(
+                                       database => $database,
+                                       input => $input_name,
                                        id => $mfn,
                                        ds => $ds,
-                                       prefix => $input->{name},
                                ) if ($ds && !$stats);
 
                                $indexer->add(
-                                       id => $input->{name} . "/" . $mfn,
+                                       id => "${input_name}/${mfn}",
                                        ds => $ds,
-                                       type => $config->{$use_indexer}->{type},
+                                       type => $config->get($indexer_config)->{type},
                                ) if ($indexer && $ds);
 
-                               if ($marc_fh) {
-                                       my $marc = new MARC::Record;
-                                       $marc->encoding( 'utf-8' );
-                                       my @marc_fields = WebPAC::Normalize::_get_marc_fields();
-                                       if (! @marc_fields) {
-                                               $log->warn("MARC record $mfn is empty, skipping");
-                                       } else {
-                                               $marc->add_fields( @marc_fields );
-                                               if ($marc_lint) {
-                                                       $lint->check_record( $marc );
-                                                       my $err = join( "\n", $lint->warnings );
-                                                       $log->error("MARC lint detected warning on MFN $mfn\n",
-                                                               "Original imput row: ",dump($row), "\n",
-                                                               "Normalized MARC row: ",dump(@marc_fields), "\n",
-                                                               "MARC lint warnings: ",$err,"\n"
-                                                       ) if ($err);
-                                               }
-                                               print $marc_fh $marc->as_usmarc;
+                               if ($marc) {
+                                       my $i = 0;
+
+                                       while (my $fields = WebPAC::Normalize::_get_marc_fields( fetch_next => 1 ) ) {
+                                               $marc->add(
+                                                       id => $mfn . ( $i ? "/$i" : '' ),
+                                                       fields => $fields,
+                                                       leader => WebPAC::Normalize::marc_leader(),
+                                                       row => $row,
+                                               );
+                                               $i++;
                                        }
+
+                                       $log->info("Created $i instances of MFN $mfn\n") if ($i > 1);
                                }
 
                                $total_rows++;
                        }
 
-                       $log->info("statistics of fields usage:\n", $input_db->stats) if ($stats);
+                       if ($validate) {
+                               my $errors = $validate->report;
+                               if ($errors) {
+                                       $log->info("validation errors:\n$errors\n" );
+                                       print $report_fh "$errors\n" if ($report_fh);
+                               }
+                       }
+
+                       if ($stats) {
+                               my $s = $input_db->stats;
+                               $log->info("statistics of fields usage:\n$s");
+                               print $report_fh "Statistics of fields usage:\n$s" if ($report_fh);
+                       }
 
                        # close MARC file
-                       close($marc_fh) if ($marc_fh);
+                       $marc->finish if ($marc);
 
+                       # close report
+                       close($report_fh) if ($report_fh)
                }
 
        }
@@ -356,27 +539,33 @@ while (my ($database, $db_config) = each %{ $config->{databases} }) {
                )
        );
 
-       #
-       # add Hyper Estraier links to other databases
-       #
-       if (ref($db_config->{links}) eq 'ARRAY' && $use_indexer) {
-               foreach my $link (@{ $db_config->{links} }) {
-                       if ($use_indexer eq 'hyperestraier') {
-                               $log->info("saving link $database -> $link->{to} [$link->{credit}]");
-                               push @links, {
-                                       from => $database,
-                                       to => $link->{to},
-                                       credit => $link->{credit},
-                               };
-                       } else {
-                               $log->warn("NOT IMPLEMENTED WITH $use_indexer: adding link $database -> $link->{to} [$link->{credit}]");
-                       }
-               }
+
+       # end forked process
+       if ($parallel) {
+               $log->info("parallel process $$ finished");
+               exit(0);
        }
 
 }
 
-foreach my $link (@links) {
-       $log->info("adding link $link->{from} -> $link->{to} [$link->{credit}]");
-       $indexer->add_link( %{ $link } );
+if ($parallel) {
+       # wait all children to finish
+       sleep(1) while wait != -1;
+       $log->info("all parallel processes finished");
+}
+
+#
+# handle links or merge after indexing
+#
+
+if ($merge) {
+       print $estcmd_fh 'sudo /etc/init.d/hyperestraier start',$/;
+       close($estcmd_fh);
+       chmod 0700, $estcmd_path || $log->warn("can't chmod 0700 $estcmd_path: $!");
+       system $estcmd_path;
+} else {
+       foreach my $link (@links) {
+               $log->logdie("coderef in link ", Dumper($link), " is ", ref($link), " and not CODE") unless (ref($link) eq 'CODE');
+               $link->();
+       }
 }