Turn on option low_mem (which need rewrite to use db/row) if there
[webpac2] / lib / WebPAC / Input.pm
1 package WebPAC::Input;
2
3 use warnings;
4 use strict;
5
6 use blib;
7
8 use WebPAC::Common;
9 use base qw/WebPAC::Common/;
10 use Data::Dumper;
11 use Encode qw/from_to/;
12
13 =head1 NAME
14
15 WebPAC::Input - read different file formats into WebPAC
16
17 =head1 VERSION
18
19 Version 0.13
20
21 =cut
22
23 our $VERSION = '0.13';
24
25 =head1 SYNOPSIS
26
27 This module implements input as database which have fixed and known
28 I<size> while indexing and single unique numeric identifier for database
29 position ranging from 1 to I<size>.
30
31 Simply, something that is indexed by unmber from 1 .. I<size>.
32
33 Examples of such databases are CDS/ISIS files, MARC files, lines in
34 text file, and so on.
35
36 Specific file formats are implemented using low-level interface modules,
37 located in C<WebPAC::Input::*> namespace which export C<open_db>,
38 C<fetch_rec> and optional C<init> functions.
39
40 Perhaps a little code snippet.
41
42         use WebPAC::Input;
43
44         my $db = WebPAC::Input->new(
45                 module => 'WebPAC::Input::ISIS',
46                 low_mem => 1,
47         );
48
49         $db->open( path => '/path/to/database' );
50         print "database size: ",$db->size,"\n";
51         while (my $rec = $db->fetch) {
52                 # do something with $rec
53         }
54
55
56
57 =head1 FUNCTIONS
58
59 =head2 new
60
61 Create new input database object.
62
63   my $db = new WebPAC::Input(
64         module => 'WebPAC::Input::MARC',
65         encoding => 'ISO-8859-2',
66         low_mem => 1,
67         recode => 'char pairs',
68         no_progress_bar => 1,
69   );
70
71 C<module> is low-level file format module. See L<WebPAC::Input::ISIS> and
72 L<WebPAC::Input::MARC>.
73
74 Optional parametar C<encoding> specify application code page (which will be
75 used internally). This should probably be your terminal encoding, and by
76 default, it C<ISO-8859-2>.
77
78 Default is not to use C<low_mem> options (see L<MEMORY USAGE> below).
79
80 C<recode> is optional string constisting of character or words pairs that
81 should be replaced in input stream.
82
83 C<no_progress_bar> disables progress bar output on C<STDOUT>
84
85 This function will also call low-level C<init> if it exists with same
86 parametars.
87
88 =cut
89
90 sub new {
91         my $class = shift;
92         my $self = {@_};
93         bless($self, $class);
94
95         my $log = $self->_get_logger;
96
97         $log->logconfess("code_page argument is not suppored any more. change it to encoding") if ($self->{lookup});
98         $log->logconfess("lookup argument is not suppored any more. rewrite call to lookup_ref") if ($self->{lookup});
99
100         $log->logconfess("specify low-level file format module") unless ($self->{module});
101         my $module_path = $self->{module};
102         $module_path =~ s#::#/#g;
103         $module_path .= '.pm';
104         $log->debug("require low-level module $self->{module} from $module_path");
105
106         require $module_path;
107
108         # check if required subclasses are implemented
109         foreach my $subclass (qw/open_db fetch_rec init dump_rec/) {
110                 # FIXME
111         }
112
113         $self->{'encoding'} ||= 'ISO-8859-2';
114
115         $self ? return $self : return undef;
116 }
117
118 =head2 open
119
120 This function will read whole database in memory and produce lookups.
121
122  $input->open(
123         path => '/path/to/database/file',
124         code_page => 'cp852',
125         limit => 500,
126         offset => 6000,
127         stats => 1,
128         lookup_coderef => sub {
129                 my $rec = shift;
130                 # store lookups
131         },
132         modify_records => {
133                 900 => { '^a' => { ' : ' => '^b' } },
134                 901 => { '*' => { '^b' => ' ; ' } },
135         },
136         modify_file => 'conf/modify/mapping.map',
137  );
138
139 By default, C<code_page> is assumed to be C<cp852>.
140
141 C<offset> is optional parametar to position at some offset before reading from database.
142
143 C<limit> is optional parametar to read just C<limit> records from database
144
145 C<stats> create optional report about usage of fields and subfields
146
147 C<lookup_coderef> is closure to called to save data into lookups
148
149 C<modify_records> specify mapping from subfields to delimiters or from
150 delimiters to subfields, as well as oprations on fields (if subfield is
151 defined as C<*>.
152
153 C<modify_file> is alternative for C<modify_records> above which preserves order and offers
154 (hopefully) simplier sintax than YAML or perl (see L</modify_file_regex>). This option
155 overrides C<modify_records> if both exists for same input.
156
157 Returns size of database, regardless of C<offset> and C<limit>
158 parametars, see also C<size>.
159
160 =cut
161
162 sub open {
163         my $self = shift;
164         my $arg = {@_};
165
166         my $log = $self->_get_logger();
167
168         $log->logconfess("lookup argument is not suppored any more. rewrite call to lookup_coderef") if ($arg->{lookup});
169         $log->logconfess("lookup_coderef must be CODE, not ",ref($arg->{lookup_coderef}))
170                 if ($arg->{lookup_coderef} && ref($arg->{lookup_coderef}) ne 'CODE');
171
172         $log->debug( $arg->{lookup_coderef} ? '' : 'not ', "using lookup_coderef");
173
174         $log->logcroak("need path") if (! $arg->{'path'});
175         my $code_page = $arg->{'code_page'} || 'cp852';
176
177         # store data in object
178         $self->{'input_code_page'} = $code_page;
179         foreach my $v (qw/path offset limit/) {
180                 $self->{$v} = $arg->{$v} if ($arg->{$v});
181         }
182
183         my $filter_ref;
184         my $recode_regex;
185         my $recode_map;
186
187         if ($self->{recode}) {
188                 my @r = split(/\s/, $self->{recode});
189                 if ($#r % 2 != 1) {
190                         $log->logwarn("recode needs even number of elements (some number of valid pairs)");
191                 } else {
192                         while (@r) {
193                                 my $from = shift @r;
194                                 my $to = shift @r;
195                                 $recode_map->{$from} = $to;
196                         }
197
198                         $recode_regex = join '|' => keys %{ $recode_map };
199
200                         $log->debug("using recode regex: $recode_regex");
201                 }
202
203         }
204
205         my $rec_regex;
206         if (my $p = $arg->{modify_file}) {
207                 $log->debug("using modify_file $p");
208                 $rec_regex = $self->modify_file_regexps( $p );
209         } elsif (my $h = $arg->{modify_records}) {
210                 $log->debug("using modify_records ", Dumper( $h ));
211                 $rec_regex = $self->modify_record_regexps(%{ $h });
212         }
213         $log->debug("rec_regex: ", Dumper($rec_regex)) if ($rec_regex);
214
215         my $class = $self->{module} || $log->logconfess("can't get low-level module name!");
216
217         my $ll_db = $class->new(
218                 path => $arg->{path},
219 #               filter => sub {
220 #                       my ($l,$f_nr) = @_;
221 #                       return unless defined($l);
222 #                       from_to($l, $code_page, $self->{'encoding'});
223 #                       $l =~ s/($recode_regex)/$recode_map->{$1}/g if ($recode_regex && $recode_map);
224 #                       return $l;
225 #               },
226                 %{ $arg },
227         );
228
229         unless (defined($ll_db)) {
230                 $log->logwarn("can't open database $arg->{path}, skipping...");
231                 return;
232         }
233
234         my $size = $ll_db->size;
235
236         unless ($size) {
237                 $log->logwarn("no records in database $arg->{path}, skipping...");
238                 return;
239         }
240
241         my $from_rec = 1;
242         my $to_rec = $size;
243
244         if (my $s = $self->{offset}) {
245                 $log->debug("skipping to MFN $s");
246                 $from_rec = $s;
247         } else {
248                 $self->{offset} = $from_rec;
249         }
250
251         if ($self->{limit}) {
252                 $log->debug("limiting to ",$self->{limit}," records");
253                 $to_rec = $from_rec + $self->{limit} - 1;
254                 $to_rec = $size if ($to_rec > $size);
255         }
256
257         # store size for later
258         $self->{size} = ($to_rec - $from_rec) ? ($to_rec - $from_rec + 1) : 0;
259
260         $log->info("processing $self->{size}/$size records [$from_rec-$to_rec] convert $code_page -> $self->{encoding}", $self->{stats} ? ' [stats]' : '');
261
262         # turn on low_mem for databases with more than 100000 records!
263         if (! $self->{low_mem} && $size > 100000) {
264                 $log->warn("Using on-disk storage instead of memory for input data. This will affect performance.");
265                 $self->{low_mem}++;
266         }
267
268         # running with low_mem flag? well, use DBM::Deep then.
269         if ($self->{'low_mem'}) {
270                 $log->info("running with low_mem which impacts performance (<32 Mb memory usage)");
271
272                 my $db_file = "data.db";
273
274                 if (-e $db_file) {
275                         unlink $db_file or $log->logdie("can't remove '$db_file' from last run");
276                         $log->debug("removed '$db_file' from last run");
277                 }
278
279                 require DBM::Deep;
280
281                 my $db = new DBM::Deep $db_file;
282
283                 $log->logdie("DBM::Deep error: $!") unless ($db);
284
285                 if ($db->error()) {
286                         $log->logdie("can't open '$db_file' under low_mem: ",$db->error());
287                 } else {
288                         $log->debug("using file '$db_file' for DBM::Deep");
289                 }
290
291                 $self->{'db'} = $db;
292         }
293
294         # read database
295         for (my $pos = $from_rec; $pos <= $to_rec; $pos++) {
296
297                 $log->debug("position: $pos\n");
298
299                 my $rec = $ll_db->fetch_rec($pos, sub {
300                                 my ($l,$f_nr) = @_;
301 #                               return unless defined($l);
302 #                               return $l unless ($rec_regex && $f_nr);
303
304                                 $log->debug("-=> $f_nr ## $l");
305
306                                 # codepage conversion and recode_regex
307                                 from_to($l, $code_page, $self->{'encoding'});
308                                 $l =~ s/($recode_regex)/$recode_map->{$1}/g if ($recode_regex && $recode_map);
309
310                                 # apply regexps
311                                 if ($rec_regex && defined($rec_regex->{$f_nr})) {
312                                         $log->logconfess("regexps->{$f_nr} must be ARRAY") if (ref($rec_regex->{$f_nr}) ne 'ARRAY');
313                                         my $c = 0;
314                                         foreach my $r (@{ $rec_regex->{$f_nr} }) {
315                                                 my $old_l = $l;
316                                                 eval '$l =~ ' . $r;
317                                                 if ($old_l ne $l) {
318                                                         $log->debug("REGEX on $f_nr eval \$l =~ $r\n## old l: [$old_l]\n## new l: [$l]");
319                                                 }
320                                                 $log->error("error applying regex: $r") if ($@);
321                                         }
322                                 }
323
324                                 $log->debug("<=- $f_nr ## $l");
325                                 return $l;
326                 });
327
328                 $log->debug(sub { Dumper($rec) });
329
330                 if (! $rec) {
331                         $log->warn("record $pos empty? skipping...");
332                         next;
333                 }
334
335                 # store
336                 if ($self->{low_mem}) {
337                         $self->{db}->put($pos, $rec);
338                 } else {
339                         $self->{data}->{$pos} = $rec;
340                 }
341
342                 # create lookup
343                 $arg->{'lookup_coderef'}->( $rec ) if ($rec && $arg->{'lookup_coderef'});
344
345                 # update counters for statistics
346                 if ($self->{stats}) {
347
348                         # fetch clean record with regexpes applied for statistics
349                         my $rec = $ll_db->fetch_rec($pos);
350
351                         foreach my $fld (keys %{ $rec }) {
352                                 $self->{_stats}->{fld}->{ $fld }++;
353
354                                 $log->logdie("invalid record fild $fld, not ARRAY")
355                                         unless (ref($rec->{ $fld }) eq 'ARRAY');
356         
357                                 foreach my $row (@{ $rec->{$fld} }) {
358
359                                         if (ref($row) eq 'HASH') {
360
361                                                 foreach my $sf (keys %{ $row }) {
362                                                         next if ($sf eq 'subfields');
363                                                         $self->{_stats}->{sf}->{ $fld }->{ $sf }->{count}++;
364                                                         $self->{_stats}->{sf}->{ $fld }->{ $sf }->{repeatable}++
365                                                                         if (ref($row->{$sf}) eq 'ARRAY');
366                                                 }
367
368                                         } else {
369                                                 $self->{_stats}->{repeatable}->{ $fld }++;
370                                         }
371                                 }
372                         }
373                 }
374
375                 $self->progress_bar($pos,$to_rec) unless ($self->{no_progress_bar});
376
377         }
378
379         $self->{pos} = -1;
380         $self->{last_pcnt} = 0;
381
382         # store max mfn and return it.
383         $self->{max_pos} = $to_rec;
384         $log->debug("max_pos: $to_rec");
385
386         # save for dump
387         $self->{ll_db} = $ll_db;
388
389         return $size;
390 }
391
392 =head2 fetch
393
394 Fetch next record from database. It will also displays progress bar.
395
396  my $rec = $isis->fetch;
397
398 Record from this function should probably go to C<data_structure> for
399 normalisation.
400
401 =cut
402
403 sub fetch {
404         my $self = shift;
405
406         my $log = $self->_get_logger();
407
408         $log->logconfess("it seems that you didn't load database!") unless ($self->{pos});
409
410         if ($self->{pos} == -1) {
411                 $self->{pos} = $self->{offset};
412         } else {
413                 $self->{pos}++;
414         }
415
416         my $mfn = $self->{pos};
417
418         if ($mfn > $self->{max_pos}) {
419                 $self->{pos} = $self->{max_pos};
420                 $log->debug("at EOF");
421                 return;
422         }
423
424         $self->progress_bar($mfn,$self->{max_pos}) unless ($self->{no_progress_bar});
425
426         my $rec;
427
428         if ($self->{low_mem}) {
429                 $rec = $self->{db}->get($mfn);
430         } else {
431                 $rec = $self->{data}->{$mfn};
432         }
433
434         $rec ||= 0E0;
435 }
436
437 =head2 pos
438
439 Returns current record number (MFN).
440
441  print $isis->pos;
442
443 First record in database has position 1.
444
445 =cut
446
447 sub pos {
448         my $self = shift;
449         return $self->{pos};
450 }
451
452
453 =head2 size
454
455 Returns number of records in database
456
457  print $isis->size;
458
459 Result from this function can be used to loop through all records
460
461  foreach my $mfn ( 1 ... $isis->size ) { ... }
462
463 because it takes into account C<offset> and C<limit>.
464
465 =cut
466
467 sub size {
468         my $self = shift;
469         return $self->{size};
470 }
471
472 =head2 seek
473
474 Seek to specified MFN in file.
475
476  $isis->seek(42);
477
478 First record in database has position 1.
479
480 =cut
481
482 sub seek {
483         my $self = shift;
484         my $pos = shift || return;
485
486         my $log = $self->_get_logger();
487
488         if ($pos < 1) {
489                 $log->warn("seek before first record");
490                 $pos = 1;
491         } elsif ($pos > $self->{max_pos}) {
492                 $log->warn("seek beyond last record");
493                 $pos = $self->{max_pos};
494         }
495
496         return $self->{pos} = (($pos - 1) || -1);
497 }
498
499 =head2 stats
500
501 Dump statistics about field and subfield usage
502
503   print $input->stats;
504
505 =cut
506
507 sub stats {
508         my $self = shift;
509
510         my $log = $self->_get_logger();
511
512         my $s = $self->{_stats};
513         if (! $s) {
514                 $log->warn("called stats, but there is no statistics collected");
515                 return;
516         }
517
518         my $max_fld = 0;
519
520         my $out = join("\n",
521                 map {
522                         my $f = $_ || die "no field";
523                         my $v = $s->{fld}->{$f} || die "no s->{fld}->{$f}";
524                         $max_fld = $v if ($v > $max_fld);
525
526                         my $o = sprintf("%4s %d ~", $f, $v);
527
528                         if (defined($s->{sf}->{$f})) {
529                                 map {
530                                         $o .= sprintf(" %s:%d%s", $_, 
531                                                 $s->{sf}->{$f}->{$_}->{count},
532                                                 $s->{sf}->{$f}->{$_}->{repeatable} ? '*' : '',
533                                         );
534                                 } sort keys %{ $s->{sf}->{$f} };
535                         }
536
537                         if (my $v_r = $s->{repeatable}->{$f}) {
538                                 $o .= " ($v_r)" if ($v_r != $v);
539                         }
540
541                         $o;
542                 } sort { $a cmp $b } keys %{ $s->{fld} }
543         );
544
545         $log->debug( sub { Dumper($s) } );
546
547         return $out;
548 }
549
550 =head2 dump
551
552 Display humanly readable dump of record
553
554 =cut
555
556 sub dump {
557         my $self = shift;
558
559         return $self->{ll_db}->dump_rec( $self->{pos} );
560
561 }
562
563 =head2 modify_record_regexps
564
565 Generate hash with regexpes to be applied using l<filter>.
566
567   my $regexpes = $input->modify_record_regexps(
568                 900 => { '^a' => { ' : ' => '^b' } },
569                 901 => { '*' => { '^b' => ' ; ' } },
570   );
571
572 =cut
573
574 sub _get_regex {
575         my ($sf,$from,$to) = @_;
576         if ($sf =~ /^\^/) {
577                 return
578                         's/\Q'. $sf .'\E([^\^]*?)\Q'. $from .'\E([^\^]*?)/'. $sf .'$1'. $to .'$2/';
579         } else {
580                 return
581                         's/\Q'. $from .'\E/'. $to .'/g';
582         }
583 }
584
585 sub modify_record_regexps {
586         my $self = shift;
587         my $modify_record = {@_};
588
589         my $regexpes;
590
591         my $log = $self->_get_logger();
592
593         foreach my $f (keys %$modify_record) {
594                 $log->debug("field: $f");
595
596                 foreach my $sf (keys %{ $modify_record->{$f} }) {
597                         $log->debug("subfield: $sf");
598
599                         foreach my $from (keys %{ $modify_record->{$f}->{$sf} }) {
600                                 my $to = $modify_record->{$f}->{$sf}->{$from};
601                                 #die "no field?" unless defined($to);
602                                 $log->debug("transform: |$from| -> |$to|");
603
604                                 my $regex = _get_regex($sf,$from,$to);
605                                 push @{ $regexpes->{$f} }, $regex;
606                                 $log->debug("regex: $regex");
607                         }
608                 }
609         }
610
611         return $regexpes;
612 }
613
614 =head2 modify_file_regexps
615
616 Generate hash with regexpes to be applied using l<filter> from
617 pseudo hash/yaml format for regex mappings.
618
619 It should be obvious:
620
621         200
622           '^a'
623             ' : ' => '^e'
624             ' = ' => '^d'
625
626 In field I<200> find C<'^a'> and then C<' : '>, and replace it with C<'^e'>.
627 In field I<200> find C<'^a'> and then C<' = '>, and replace it with C<'^d'>.
628
629   my $regexpes = $input->modify_file_regexps( 'conf/modify/common.pl' );
630
631 On undef path it will just return.
632
633 =cut
634
635 sub modify_file_regexps {
636         my $self = shift;
637
638         my $modify_path = shift || return;
639
640         my $log = $self->_get_logger();
641
642         my $regexpes;
643
644         CORE::open(my $fh, $modify_path) || $log->logdie("can't open modify file $modify_path: $!");
645
646         my ($f,$sf);
647
648         while(<$fh>) {
649                 chomp;
650                 next if (/^#/ || /^\s*$/);
651
652                 if (/^\s*(\d+)\s*$/) {
653                         $f = $1;
654                         $log->debug("field: $f");
655                         next;
656                 } elsif (/^\s*'([^']*)'\s*$/) {
657                         $sf = $1;
658                         $log->die("can't define subfiled before field in: $_") unless ($f);
659                         $log->debug("subfield: $sf");
660                 } elsif (/^\s*'([^']*)'\s*=>\s*'([^']*)'\s*$/) {
661                         my ($from,$to) = ($1, $2);
662
663                         $log->debug("transform: |$from| -> |$to|");
664
665                         my $regex = _get_regex($sf,$from,$to);
666                         push @{ $regexpes->{$f} }, $regex;
667                         $log->debug("regex: $regex");
668                 }
669         }
670
671         return $regexpes;
672 }
673
674 =head1 MEMORY USAGE
675
676 C<low_mem> options is double-edged sword. If enabled, WebPAC
677 will run on memory constraint machines (which doesn't have enough
678 physical RAM to create memory structure for whole source database).
679
680 If your machine has 512Mb or more of RAM and database is around 10000 records,
681 memory shouldn't be an issue. If you don't have enough physical RAM, you
682 might consider using virtual memory (if your operating system is handling it
683 well, like on FreeBSD or Linux) instead of dropping to L<DBM::Deep> to handle
684 parsed structure of ISIS database (this is what C<low_mem> option does).
685
686 Hitting swap at end of reading source database is probably o.k. However,
687 hitting swap before 90% will dramatically decrease performance and you will
688 be better off with C<low_mem> and using rest of availble memory for
689 operating system disk cache (Linux is particuallary good about this).
690 However, every access to database record will require disk access, so
691 generation phase will be slower 10-100 times.
692
693 Parsed structures are essential - you just have option to trade RAM memory
694 (which is fast) for disk space (which is slow). Be sure to have planty of
695 disk space if you are using C<low_mem> and thus L<DBM::Deep>.
696
697 However, when WebPAC is running on desktop machines (or laptops :-), it's
698 highly undesireable for system to start swapping. Using C<low_mem> option can
699 reduce WecPAC memory usage to around 64Mb for same database with lookup
700 fields and sorted indexes which stay in RAM. Performance will suffer, but
701 memory usage will really be minimal. It might be also more confortable to
702 run WebPAC reniced on those machines.
703
704
705 =head1 AUTHOR
706
707 Dobrica Pavlinusic, C<< <dpavlin@rot13.org> >>
708
709 =head1 COPYRIGHT & LICENSE
710
711 Copyright 2005-2006 Dobrica Pavlinusic, All Rights Reserved.
712
713 This program is free software; you can redistribute it and/or modify it
714 under the same terms as Perl itself.
715
716 =cut
717
718 1; # End of WebPAC::Input