use debug level to, well, display debug
[webpac2] / lib / WebPAC / Normalize.pm
1 package WebPAC::Normalize;
2 use Exporter 'import';
3 @EXPORT = qw/
4         _set_rec _set_lookup
5         _set_load_row
6         _get_ds _clean_ds
7         _debug
8         _pack_subfields_hash
9
10         tag search display
11         marc marc_indicators marc_repeatable_subfield
12         marc_compose marc_leader marc_fixed
13         marc_duplicate marc_remove marc_count
14         marc_original_order
15
16         rec1 rec2 rec
17         regex prefix suffix surround
18         first lookup join_with
19         save_into_lookup
20
21         split_rec_on
22
23         get set
24         count
25 /;
26
27 use warnings;
28 use strict;
29
30 #use base qw/WebPAC::Common/;
31 use Data::Dump qw/dump/;
32 use Storable qw/dclone/;
33 use Carp qw/confess/;
34
35 # debugging warn(s)
36 my $debug = 0;
37
38
39 =head1 NAME
40
41 WebPAC::Normalize - describe normalisaton rules using sets
42
43 =head1 VERSION
44
45 Version 0.28
46
47 =cut
48
49 our $VERSION = '0.28';
50
51 =head1 SYNOPSIS
52
53 This module uses C<conf/normalize/*.pl> files to perform normalisation
54 from input records using perl functions which are specialized for set
55 processing.
56
57 Sets are implemented as arrays, and normalisation file is valid perl, which
58 means that you check it's validity before running WebPAC using
59 C<perl -c normalize.pl>.
60
61 Normalisation can generate multiple output normalized data. For now, supported output
62 types (on the left side of definition) are: C<tag>, C<display>, C<search> and
63 C<marc>.
64
65 =head1 FUNCTIONS
66
67 Functions which start with C<_> are private and used by WebPAC internally.
68 All other functions are available for use within normalisation rules.
69
70 =head2 data_structure
71
72 Return data structure
73
74   my $ds = WebPAC::Normalize::data_structure(
75         lookup => $lookup_hash,
76         row => $row,
77         rules => $normalize_pl_config,
78         marc_encoding => 'utf-8',
79         config => $config,
80         load_row_coderef => sub {
81                 my ($database,$input,$mfn) = shift;
82                 $store->load_row( database => $database, input => $input, id => $mfn );
83         },
84   );
85
86 Options C<row>, C<rules> and C<log> are mandatory while all
87 other are optional.
88
89 C<load_row_coderef> is closure only used when executing lookups, so they will
90 die if it's not defined.
91
92 This function will B<die> if normalizastion can't be evaled.
93
94 Since this function isn't exported you have to call it with 
95 C<WebPAC::Normalize::data_structure>.
96
97 =cut
98
99 my $load_row_coderef;
100
101 sub data_structure {
102         my $arg = {@_};
103
104         die "need row argument" unless ($arg->{row});
105         die "need normalisation argument" unless ($arg->{rules});
106
107         no strict 'subs';
108         _set_lookup( $arg->{lookup} ) if defined($arg->{lookup});
109         _set_rec( $arg->{row} );
110         _set_config( $arg->{config} ) if defined($arg->{config});
111         _clean_ds( %{ $arg } );
112         $load_row_coderef = $arg->{load_row_coderef};
113
114         eval "$arg->{rules}";
115         die "error evaling $arg->{rules}: $@\n" if ($@);
116
117         return _get_ds();
118 }
119
120 =head2 _set_rec
121
122 Set current record hash
123
124   _set_rec( $rec );
125
126 =cut
127
128 my $rec;
129
130 sub _set_rec {
131         $rec = shift or die "no record hash";
132 }
133
134 =head2 _set_config
135
136 Set current config hash
137
138   _set_config( $config );
139
140 Magic keys are:
141
142 =over 4
143
144 =item _
145
146 Code of current database
147
148 =item _mfn
149
150 Current MFN
151
152 =back
153
154 =cut
155
156 my $config;
157
158 sub _set_config {
159         $config = shift;
160 }
161
162 =head2 _get_ds
163
164 Return hash formatted as data structure
165
166   my $ds = _get_ds();
167
168 =cut
169
170 my ($out, $marc_record, $marc_encoding, $marc_repeatable_subfield, $marc_indicators, $marc_leader);
171 my ($marc_record_offset, $marc_fetch_offset) = (0, 0);
172
173 sub _get_ds {
174         return $out;
175 }
176
177 =head2 _clean_ds
178
179 Clean data structure hash for next record
180
181   _clean_ds();
182
183 =cut
184
185 sub _clean_ds {
186         my $a = {@_};
187         ($out,$marc_record, $marc_encoding, $marc_repeatable_subfield, $marc_indicators, $marc_leader) = ();
188         ($marc_record_offset, $marc_fetch_offset) = (0,0);
189         $marc_encoding = $a->{marc_encoding};
190 }
191
192 =head2 _set_lookup
193
194 Set current lookup hash
195
196   _set_lookup( $lookup );
197
198 =cut
199
200 my $lookup;
201
202 sub _set_lookup {
203         $lookup = shift;
204 }
205
206 =head2 _get_lookup
207
208 Get current lookup hash
209
210   my $lookup = _get_lookup();
211
212 =cut
213
214 sub _get_lookup {
215         return $lookup;
216 }
217
218 =head2 _set_load_row
219
220 Setup code reference which will return L<data_structure> from
221 L<WebPAC::Store>
222
223   _set_load_row(sub {
224                 my ($database,$input,$mfn) = @_;
225                 $store->load_row( database => $database, input => $input, id => $mfn );
226   });
227
228 =cut
229
230 sub _set_load_row {
231         my $coderef = shift;
232         confess "argument isn't CODE" unless ref($coderef) eq 'CODE';
233
234         $load_row_coderef = $coderef;
235 }
236
237 =head2 _get_marc_fields
238
239 Get all fields defined by calls to C<marc>
240
241         $marc->add_fields( WebPAC::Normalize:_get_marc_fields() );
242
243 We are using I<magic> which detect repeatable fields only from
244 sequence of field/subfield data generated by normalization.
245
246 Repeatable field is created when there is second occurence of same subfield or
247 if any of indicators are different.
248
249 This is sane for most cases. Something like:
250
251   900a-1 900b-1 900c-1
252   900a-2 900b-2
253   900a-3
254
255 will be created from any combination of:
256
257   900a-1 900a-2 900a-3 900b-1 900b-2 900c-1
258
259 and following rules:
260
261   marc('900','a', rec('200','a') );
262   marc('900','b', rec('200','b') );
263   marc('900','c', rec('200','c') );
264
265 which might not be what you have in mind. If you need repeatable subfield,
266 define it using C<marc_repeatable_subfield> like this:
267
268   marc_repeatable_subfield('900','a');
269   marc('900','a', rec('200','a') );
270   marc('900','b', rec('200','b') );
271   marc('900','c', rec('200','c') );
272
273 will create:
274
275   900a-1 900a-2 900a-3 900b-1 900c-1
276   900b-2
277
278 There is also support for returning next or specific using:
279
280   while (my $mf = WebPAC::Normalize:_get_marc_fields( fetch_next => 1 ) ) {
281         # do something with $mf
282   }
283
284 will always return fields from next MARC record or
285
286   my $mf = WebPAC::Normalize::_get_marc_fields( offset => 42 );
287
288 will return 42th copy record (if it exists).
289
290 =cut
291
292 my $fetch_pos;
293
294 sub _get_marc_fields {
295
296         my $arg = {@_};
297         warn "### _get_marc_fields arg: ", dump($arg), $/ if ($debug > 2);
298         $fetch_pos = $marc_fetch_offset;
299         if ($arg->{offset}) {
300                 $fetch_pos = $arg->{offset};
301         } elsif($arg->{fetch_next}) {
302                 $marc_fetch_offset++;
303         }
304
305         return if (! $marc_record || ref($marc_record) ne 'ARRAY');
306
307         warn "### full marc_record = ", dump( @{ $marc_record }), $/ if ($debug > 2);
308
309         my $marc_rec = $marc_record->[ $fetch_pos ];
310
311         warn "## _get_marc_fields (at offset: $fetch_pos) -- marc_record = ", dump( @$marc_rec ), $/ if ($debug > 1);
312
313         return if (! $marc_rec || ref($marc_rec) ne 'ARRAY' || $#{ $marc_rec } < 0);
314
315         # first, sort all existing fields 
316         # XXX might not be needed, but modern perl might randomize elements in hash
317         my @sorted_marc_record = sort {
318                 $a->[0] . ( $a->[3] || '' ) cmp $b->[0] . ( $b->[3] || '')
319         } @{ $marc_rec };
320
321         @sorted_marc_record = @{ $marc_rec };   ### FIXME disable sorting
322         
323         # output marc fields
324         my @m;
325
326         # count unique field-subfields (used for offset when walking to next subfield)
327         my $u;
328         map { $u->{ $_->[0] . ( $_->[3] || '')  }++ } @sorted_marc_record;
329
330         if ($debug) {
331                 warn "## marc_repeatable_subfield = ", dump( $marc_repeatable_subfield ), $/ if ( $marc_repeatable_subfield );
332                 warn "## marc_record[$fetch_pos] = ", dump( $marc_rec ), $/;
333                 warn "## sorted_marc_record = ", dump( \@sorted_marc_record ), $/;
334                 warn "## subfield count = ", dump( $u ), $/;
335         }
336
337         my $len = $#sorted_marc_record;
338         my $visited;
339         my $i = 0;
340         my $field;
341
342         foreach ( 0 .. $len ) {
343
344                 # find next element which isn't visited
345                 while ($visited->{$i}) {
346                         $i = ($i + 1) % ($len + 1);
347                 }
348
349                 # mark it visited
350                 $visited->{$i}++;
351
352                 my $row = dclone( $sorted_marc_record[$i] );
353
354                 # field and subfield which is key for
355                 # marc_repeatable_subfield and u
356                 my $fsf = $row->[0] . ( $row->[3] || '' );
357
358                 if ($debug > 1) {
359
360                         print "### field so far [", $#$field, "] : ", dump( $field ), " ", $field ? 'T' : 'F', $/;
361                         print "### this [$i]: ", dump( $row ),$/;
362                         print "### sf: ", $row->[3], " vs ", $field->[3],
363                                 $marc_repeatable_subfield->{ $row->[0] . $row->[3] } ? ' (repeatable)' : '', $/,
364                                 if ($#$field >= 0);
365
366                 }
367
368                 # if field exists
369                 if ( $#$field >= 0 ) {
370                         if (
371                                 $row->[0] ne $field->[0] ||             # field
372                                 $row->[1] ne $field->[1] ||             # i1
373                                 $row->[2] ne $field->[2]                # i2
374                         ) {
375                                 push @m, $field;
376                                 warn "## saved/1 ", dump( $field ),$/ if ($debug);
377                                 $field = $row;
378
379                         } elsif (
380                                 ( $row->[3] lt $field->[-2] )           # subfield which is not next (e.g. a after c)
381                                 ||
382                                 ( $row->[3] eq $field->[-2] &&          # same subfield, but not repeatable
383                                         ! $marc_repeatable_subfield->{ $fsf }
384                                 )
385                         ) {
386                                 push @m, $field;
387                                 warn "## saved/2 ", dump( $field ),$/ if ($debug);
388                                 $field = $row;
389
390                         } else {
391                                 # append new subfields to existing field
392                                 push @$field, ( $row->[3], $row->[4] );
393                         }
394                 } else {
395                         # insert first field
396                         $field = $row;
397                 }
398
399                 if (! $marc_repeatable_subfield->{ $fsf }) {
400                         # make step to next subfield
401                         $i = ($i + $u->{ $fsf } ) % ($len + 1);
402                 }
403         }
404
405         if ($#$field >= 0) {
406                 push @m, $field;
407                 warn "## saved/3 ", dump( $field ),$/ if ($debug);
408         }
409
410         return \@m;
411 }
412
413 =head2 _get_marc_leader
414
415 Return leader from currently fetched record by L</_get_marc_fields>
416
417   print WebPAC::Normalize::_get_marc_leader();
418
419 =cut
420
421 sub _get_marc_leader {
422         die "no fetch_pos, did you called _get_marc_fields first?" unless ( defined( $fetch_pos ) );
423         return $marc_leader->[ $fetch_pos ];
424 }
425
426 =head2 _debug
427
428 Change level of debug warnings
429
430   _debug( 2 );
431
432 =cut
433
434 sub _debug {
435         my $l = shift;
436         return $debug unless defined($l);
437         warn "debug level $l",$/ if ($l > 0);
438         $debug = $l;
439 }
440
441 =head1 Functions to create C<data_structure>
442
443 Those functions generally have to first in your normalization file.
444
445 =head2 tag
446
447 Define new tag for I<search> and I<display>.
448
449   tag('Title', rec('200','a') );
450
451
452 =cut
453
454 sub tag {
455         my $name = shift or die "tag needs name as first argument";
456         my @o = grep { defined($_) && $_ ne '' } @_;
457         return unless (@o);
458         $out->{$name}->{tag} = $name;
459         $out->{$name}->{search} = \@o;
460         $out->{$name}->{display} = \@o;
461 }
462
463 =head2 display
464
465 Define tag just for I<display>
466
467   @v = display('Title', rec('200','a') );
468
469 =cut
470
471 sub display {
472         my $name = shift or die "display needs name as first argument";
473         my @o = grep { defined($_) && $_ ne '' } @_;
474         return unless (@o);
475         $out->{$name}->{tag} = $name;
476         $out->{$name}->{display} = \@o;
477 }
478
479 =head2 search
480
481 Prepare values just for I<search>
482
483   @v = search('Title', rec('200','a') );
484
485 =cut
486
487 sub search {
488         my $name = shift or die "search needs name as first argument";
489         my @o = grep { defined($_) && $_ ne '' } @_;
490         return unless (@o);
491         $out->{$name}->{tag} = $name;
492         $out->{$name}->{search} = \@o;
493 }
494
495 =head2 marc_leader
496
497 Setup fields within MARC leader or get leader
498
499   marc_leader('05','c');
500   my $leader = marc_leader();
501
502 =cut
503
504 sub marc_leader {
505         my ($offset,$value) = @_;
506
507         if ($offset) {
508                 $marc_leader->[ $marc_record_offset ]->{ $offset } = $value;
509         } else {
510                 
511                 if (defined($marc_leader)) {
512                         die "marc_leader not array = ", dump( $marc_leader ) unless (ref($marc_leader) eq 'ARRAY');
513                         return $marc_leader->[ $marc_record_offset ];
514                 } else {
515                         return;
516                 }
517         }
518 }
519
520 =head2 marc_fixed
521
522 Create control/indentifier fields with values in fixed positions
523
524   marc_fixed('008', 00, '070402');
525   marc_fixed('008', 39, '|');
526
527 Positions not specified will be filled with spaces (C<0x20>).
528
529 There will be no effort to extend last specified value to full length of
530 field in standard.
531
532 =cut
533
534 sub marc_fixed {
535         my ($f, $pos, $val) = @_;
536         die "need marc(field, position, value)" unless defined($f) && defined($pos);
537
538         my $update = 0;
539
540         map {
541                 if ($_->[0] eq $f) {
542                         my $old = $_->[1];
543                         if (length($old) < $pos) {
544                                 $_->[1] .= ' ' x ( $pos - length($old) ) . $val;
545                                 warn "## marc_fixed($f,$pos,'$val') append '$old' -> '$_->[1]'\n" if ($debug > 1);
546                         } else {
547                                 $_->[1] = substr($old, 0, $pos) . $val . substr($old, $pos + length($val));
548                                 warn "## marc_fixed($f,$pos,'$val') update '$old' -> '$_->[1]'\n" if ($debug > 1);
549                         }
550                         $update++;
551                 }
552         } @{ $marc_record->[ $marc_record_offset ] };
553
554         if (! $update) {
555                 my $v = ' ' x $pos . $val;
556                 push @{ $marc_record->[ $marc_record_offset ] }, [ $f, $v ];
557                 warn "## marc_fixed($f,$pos,'val') created '$v'\n" if ($debug > 1);
558         }
559 }
560
561 =head2 marc
562
563 Save value for MARC field
564
565   marc('900','a', rec('200','a') );
566   marc('001', rec('000') );
567
568 =cut
569
570 sub marc {
571         my $f = shift or die "marc needs field";
572         die "marc field must be numer" unless ($f =~ /^\d+$/);
573
574         my $sf;
575         if ($f >= 10) {
576                 $sf = shift or die "marc needs subfield";
577         }
578
579         foreach (@_) {
580                 my $v = $_;             # make var read-write for Encode
581                 next unless (defined($v) && $v !~ /^\s*$/);
582                 my ($i1,$i2) = defined($marc_indicators->{$f}) ? @{ $marc_indicators->{$f} } : (' ',' ');
583                 if (defined $sf) {
584                         push @{ $marc_record->[ $marc_record_offset ] }, [ $f, $i1, $i2, $sf => $v ];
585                 } else {
586                         push @{ $marc_record->[ $marc_record_offset ] }, [ $f, $v ];
587                 }
588         }
589 }
590
591 =head2 marc_repeatable_subfield
592
593 Save values for MARC repetable subfield
594
595   marc_repeatable_subfield('910', 'z', rec('909') );
596
597 =cut
598
599 sub marc_repeatable_subfield {
600         my ($f,$sf) = @_;
601         die "marc_repeatable_subfield need field and subfield!\n" unless ($f && $sf);
602         $marc_repeatable_subfield->{ $f . $sf }++;
603         marc(@_);
604 }
605
606 =head2 marc_indicators
607
608 Set both indicators for MARC field
609
610   marc_indicators('900', ' ', 1);
611
612 Any indicator value other than C<0-9> will be treated as undefined.
613
614 =cut
615
616 sub marc_indicators {
617         my $f = shift || die "marc_indicators need field!\n";
618         my ($i1,$i2) = @_;
619         die "marc_indicators($f, ...) need i1!\n" unless(defined($i1));
620         die "marc_indicators($f, $i1, ...) need i2!\n" unless(defined($i2));
621
622         $i1 = ' ' if ($i1 !~ /^\d$/);
623         $i2 = ' ' if ($i2 !~ /^\d$/);
624         @{ $marc_indicators->{$f} } = ($i1,$i2);
625 }
626
627 =head2 marc_compose
628
629 Save values for each MARC subfield explicitly
630
631   marc_compose('900',
632         'a', rec('200','a')
633         'b', rec('201','a')
634         'a', rec('200','b')
635         'c', rec('200','c')
636   );
637
638 If you specify C<+> for subfield, value will be appended
639 to previous defined subfield.
640
641 =cut
642
643 sub marc_compose {
644         my $f = shift or die "marc_compose needs field";
645         die "marc_compose field must be numer" unless ($f =~ /^\d+$/);
646
647         my ($i1,$i2) = defined($marc_indicators->{$f}) ? @{ $marc_indicators->{$f} } : (' ',' ');
648         my $m = [ $f, $i1, $i2 ];
649
650         warn "### marc_compose input subfields = ", dump(@_),$/ if ($debug > 2);
651
652         if ($#_ % 2 != 1) {
653                 die "ERROR: marc_compose",dump($f,@_)," not valid (must be even).\nDo you need to add first() or join() around some argument?\n";
654         }
655
656         while (@_) {
657                 my $sf = shift;
658                 my $v = shift;
659
660                 next unless (defined($v) && $v !~ /^\s*$/);
661                 warn "## ++ marc_compose($f,$sf,$v) ", dump( $m ),$/ if ($debug > 1);
662                 if ($sf ne '+') {
663                         push @$m, ( $sf, $v );
664                 } else {
665                         $m->[ $#$m ] .= $v;
666                 }
667         }
668
669         warn "## marc_compose current marc = ", dump( $m ),$/ if ($debug > 1);
670
671         push @{ $marc_record->[ $marc_record_offset ] }, $m if ($#{$m} > 2);
672 }
673
674 =head2 marc_duplicate
675
676 Generate copy of current MARC record and continue working on copy
677
678   marc_duplicate();
679
680 Copies can be accessed using C<< _get_marc_fields( fetch_next => 1 ) >> or
681 C<< _get_marc_fields( offset => 42 ) >>.
682
683 =cut
684
685 sub marc_duplicate {
686          my $m = $marc_record->[ -1 ];
687          die "can't duplicate record which isn't defined" unless ($m);
688          push @{ $marc_record }, dclone( $m );
689          push @{ $marc_leader }, dclone( marc_leader() );
690          warn "## marc_duplicate = ", dump(@$marc_leader, @$marc_record), $/ if ($debug > 1);
691          $marc_record_offset = $#{ $marc_record };
692          warn "## marc_record_offset = $marc_record_offset", $/ if ($debug > 1);
693
694 }
695
696 =head2 marc_remove
697
698 Remove some field or subfield from MARC record.
699
700   marc_remove('200');
701   marc_remove('200','a');
702
703 This will erase field C<200> or C<200^a> from current MARC record.
704
705   marc_remove('*');
706
707 Will remove all fields in current MARC record.
708
709 This is useful after calling C<marc_duplicate> or on it's own (but, you
710 should probably just remove that subfield definition if you are not
711 using C<marc_duplicate>).
712
713 FIXME: support fields < 10.
714
715 =cut
716
717 sub marc_remove {
718         my ($f, $sf) = @_;
719
720         die "marc_remove needs record number" unless defined($f);
721
722         my $marc = $marc_record->[ $marc_record_offset ];
723
724         warn "### marc_remove before = ", dump( $marc ), $/ if ($debug > 2);
725
726         if ($f eq '*') {
727
728                 delete( $marc_record->[ $marc_record_offset ] );
729                 warn "## full marc_record = ", dump( @{ $marc_record }), $/ if ($debug > 1);
730
731         } else {
732
733                 my $i = 0;
734                 foreach ( 0 .. $#{ $marc } ) {
735                         last unless (defined $marc->[$i]);
736                         warn "#### working on ",dump( @{ $marc->[$i] }), $/ if ($debug > 3);
737                         if ($marc->[$i]->[0] eq $f) {
738                                 if (! defined $sf) {
739                                         # remove whole field
740                                         splice @$marc, $i, 1;
741                                         warn "#### slice \@\$marc, $i, 1 = ",dump( @{ $marc }), $/ if ($debug > 3);
742                                         $i--;
743                                 } else {
744                                         foreach my $j ( 0 .. (( $#{ $marc->[$i] } - 3 ) / 2) ) {
745                                                 my $o = ($j * 2) + 3;
746                                                 if ($marc->[$i]->[$o] eq $sf) {
747                                                         # remove subfield
748                                                         splice @{$marc->[$i]}, $o, 2;
749                                                         warn "#### slice \@{\$marc->[$i]}, $o, 2 = ", dump( @{ $marc }), $/ if ($debug > 3);
750                                                         # is record now empty?
751                                                         if ($#{ $marc->[$i] } == 2) {
752                                                                 splice @$marc, $i, 1;
753                                                                 warn "#### slice \@\$marc, $i, 1 = ", dump( @{ $marc }), $/ if ($debug > 3);
754                                                                 $i--;
755                                                         };
756                                                 }
757                                         }
758                                 }
759                         }
760                         $i++;
761                 }
762
763                 warn "### marc_remove($f", $sf ? ",$sf" : "", ") after = ", dump( $marc ), $/ if ($debug > 2);
764
765                 $marc_record->[ $marc_record_offset ] = $marc;
766         }
767
768         warn "## full marc_record = ", dump( @{ $marc_record }), $/ if ($debug > 1);
769 }
770
771 =head2 marc_original_order
772
773 Copy all subfields preserving original order to marc field.
774
775   marc_original_order( marc_field_number, original_input_field_number );
776
777 Please note that field numbers are consistent with other commands (marc
778 field number first), but somewhat counter-intuitive (destination and then
779 source).
780
781 You might want to use this command if you are just renaming subfields or
782 using pre-processing modify_record in C<config.yml> and don't need any
783 post-processing or want to preserve order of original subfields.
784
785
786 =cut
787
788 sub marc_original_order {
789
790         my ($to, $from) = @_;
791         die "marc_original_order needs from and to fields\n" unless ($from && $to);
792
793         return unless defined($rec->{$from});
794
795         my $r = $rec->{$from};
796         die "record field $from isn't array\n" unless (ref($r) eq 'ARRAY');
797
798         my ($i1,$i2) = defined($marc_indicators->{$to}) ? @{ $marc_indicators->{$to} } : (' ',' ');
799         warn "## marc_original_order($to,$from) source = ", dump( $r ),$/ if ($debug > 1);
800
801         foreach my $d (@$r) {
802
803                 if (! defined($d->{subfields}) && ref($d->{subfields}) ne 'ARRAY') {
804                         warn "# marc_original_order($to,$from): field $from doesn't have subfields specification\n";
805                         next;
806                 }
807         
808                 my @sfs = @{ $d->{subfields} };
809
810                 die "field $from doesn't have even number of subfields specifications\n" unless($#sfs % 2 == 1);
811
812                 warn "#--> d: ",dump($d), "\n#--> sfs: ",dump(@sfs),$/ if ($debug > 2);
813
814                 my $m = [ $to, $i1, $i2 ];
815
816                 while (my $sf = shift @sfs) {
817
818                         warn "#--> sf: ",dump($sf), $/ if ($debug > 2);
819                         my $offset = shift @sfs;
820                         die "corrupted sufields specification for field $from\n" unless defined($offset);
821
822                         my $v;
823                         if (ref($d->{$sf}) eq 'ARRAY') {
824                                 $v = $d->{$sf}->[$offset] if (defined($d->{$sf}->[$offset]));
825                         } elsif ($offset == 0) {
826                                 $v = $d->{$sf};
827                         } else {
828                                 die "field $from subfield '$sf' need occurence $offset which doesn't exist", dump($d->{$sf});
829                         }
830                         push @$m, ( $sf, $v ) if (defined($v));
831                 }
832
833                 if ($#{$m} > 2) {
834                         push @{ $marc_record->[ $marc_record_offset ] }, $m;
835                 }
836         }
837
838         warn "## marc_record = ", dump( $marc_record ),$/ if ($debug > 1);
839 }
840
841 =head2 marc_count
842
843 Return number of MARC records created using L</marc_duplicate>.
844
845   print "created ", marc_count(), " records";
846
847 =cut
848
849 sub marc_count {
850         return $#{ $marc_record };
851 }
852
853
854 =head1 Functions to extract data from input
855
856 This function should be used inside functions to create C<data_structure> described
857 above.
858
859 =head2 _pack_subfields_hash
860
861  @subfields = _pack_subfields_hash( $h );
862  $subfields = _pack_subfields_hash( $h, 1 );
863
864 Return each subfield value in array or pack them all together and return scalar
865 with subfields (denoted by C<^>) and values.
866
867 =cut
868
869 sub _pack_subfields_hash {
870
871         warn "## _pack_subfields_hash( ",dump(@_), " )\n" if ($debug > 1);
872
873         my ($h,$include_subfields) = @_;
874
875         if ( defined($h->{subfields}) ) {
876                 my $sfs = delete $h->{subfields} || die "no subfields?";
877                 my @out;
878                 while (@$sfs) {
879                         my $sf = shift @$sfs;
880                         push @out, '^' . $sf if ($include_subfields);
881                         my $o = shift @$sfs;
882                         if ($o == 0 && ref( $h->{$sf} ) ne 'ARRAY' ) {
883                                 # single element subfields are not arrays
884 #warn "====> $sf $o / $#$sfs ", dump( $sfs, $h->{$sf} ), "\n";
885
886                                 push @out, $h->{$sf};
887                         } else {
888 #warn "====> $sf $o / $#$sfs ", dump( $sfs, $h->{$sf} ), "\n";
889                                 push @out, $h->{$sf}->[$o];
890                         }
891                 }
892                 if ($include_subfields) {
893                         return join('', @out);
894                 } else {
895                         return @out;
896                 }
897         } else {
898                 if ($include_subfields) {
899                         my $out = '';
900                         foreach my $sf (sort keys %$h) {
901                                 if (ref($h->{$sf}) eq 'ARRAY') {
902                                         $out .= '^' . $sf . join('^' . $sf, @{ $h->{$sf} });
903                                 } else {
904                                         $out .= '^' . $sf . $h->{$sf};
905                                 }
906                         }
907                         return $out;
908                 } else {
909                         # FIXME this should probably be in alphabetical order instead of hash order
910                         values %{$h};
911                 }
912         }
913 }
914
915 =head2 rec1
916
917 Return all values in some field
918
919   @v = rec1('200')
920
921 TODO: order of values is probably same as in source data, need to investigate that
922
923 =cut
924
925 sub rec1 {
926         my $f = shift;
927         warn "rec1($f) = ", dump( $rec->{$f} ), $/ if ($debug > 1);
928         return unless (defined($rec) && defined($rec->{$f}));
929         warn "rec1($f) = ", dump( $rec->{$f} ), $/ if ($debug > 1);
930         if (ref($rec->{$f}) eq 'ARRAY') {
931                 my @out;
932                 foreach my $h ( @{ $rec->{$f} } ) {
933                         if (ref($h) eq 'HASH') {
934                                 push @out, ( _pack_subfields_hash( $h ) );
935                         } else {
936                                 push @out, $h;
937                         }
938                 }
939                 return @out;
940         } elsif( defined($rec->{$f}) ) {
941                 return $rec->{$f};
942         }
943 }
944
945 =head2 rec2
946
947 Return all values in specific field and subfield
948
949   @v = rec2('200','a')
950
951 =cut
952
953 sub rec2 {
954         my $f = shift;
955         return unless (defined($rec && $rec->{$f}));
956         my $sf = shift;
957         warn "rec2($f,$sf) = ", dump( $rec->{$f} ), $/ if ($debug > 1);
958         return map {
959                 if (ref($_->{$sf}) eq 'ARRAY') {
960                         @{ $_->{$sf} };
961                 } else {
962                         $_->{$sf};
963                 }
964         } grep { ref($_) eq 'HASH' && $_->{$sf} } @{ $rec->{$f} };
965 }
966
967 =head2 rec
968
969 syntaxtic sugar for
970
971   @v = rec('200')
972   @v = rec('200','a')
973
974 If rec() returns just single value, it will
975 return scalar, not array.
976
977 =cut
978
979 sub rec {
980         my @out;
981         if ($#_ == 0) {
982                 @out = rec1(@_);
983         } elsif ($#_ == 1) {
984                 @out = rec2(@_);
985         }
986         if ($#out == 0 && ! wantarray) {
987                 return $out[0];
988         } elsif (@out) {
989                 return @out;
990         } else {
991                 return '';
992         }
993 }
994
995 =head2 regex
996
997 Apply regex to some or all values
998
999   @v = regex( 's/foo/bar/g', @v );
1000
1001 =cut
1002
1003 sub regex {
1004         my $r = shift;
1005         my @out;
1006         #warn "r: $r\n", dump(\@_);
1007         foreach my $t (@_) {
1008                 next unless ($t);
1009                 eval "\$t =~ $r";
1010                 push @out, $t if ($t && $t ne '');
1011         }
1012         return @out;
1013 }
1014
1015 =head2 prefix
1016
1017 Prefix all values with a string
1018
1019   @v = prefix( 'my_', @v );
1020
1021 =cut
1022
1023 sub prefix {
1024         my $p = shift or return;
1025         return map { $p . $_ } grep { defined($_) } @_;
1026 }
1027
1028 =head2 suffix
1029
1030 suffix all values with a string
1031
1032   @v = suffix( '_my', @v );
1033
1034 =cut
1035
1036 sub suffix {
1037         my $s = shift or die "suffix needs string as first argument";
1038         return map { $_ . $s } grep { defined($_) } @_;
1039 }
1040
1041 =head2 surround
1042
1043 surround all values with a two strings
1044
1045   @v = surround( 'prefix_', '_suffix', @v );
1046
1047 =cut
1048
1049 sub surround {
1050         my $p = shift or die "surround need prefix as first argument";
1051         my $s = shift or die "surround needs suffix as second argument";
1052         return map { $p . $_ . $s } grep { defined($_) } @_;
1053 }
1054
1055 =head2 first
1056
1057 Return first element
1058
1059   $v = first( @v );
1060
1061 =cut
1062
1063 sub first {
1064         my $r = shift;
1065         return $r;
1066 }
1067
1068 =head2 lookup
1069
1070 Consult lookup hashes for some value
1071
1072   @v = lookup(
1073         sub {
1074                 'ffkk/peri/mfn'.rec('000')
1075         },
1076         'ffkk','peri','200-a-200-e',
1077         sub {
1078                 first(rec(200,'a')).' '.first(rec('200','e'))
1079         }
1080   );
1081
1082 Code like above will be B<automatically generated> using L<WebPAC::Parse> from
1083 normal lookup definition in C<conf/lookup/something.pl> which looks like:
1084
1085   lookup(
1086         # which results to return from record recorded in lookup
1087         sub { 'ffkk/peri/mfn' . rec('000') },
1088         # from which database and input
1089         'ffkk','peri',
1090         # such that following values match
1091         sub { first(rec(200,'a')) . ' ' . first(rec('200','e')) },
1092         # if this part is missing, we will try to match same fields
1093         # from lookup record and current one, or you can override
1094         # which records to use from current record using
1095         sub { rec('900','x') . ' ' . rec('900','y') },
1096   )
1097
1098 You can think about this lookup as SQL (if that helps):
1099
1100   select
1101         sub { what }
1102   from
1103         database, input
1104   where
1105     sub { filter from lookuped record }
1106   having
1107     sub { optional filter on current record }
1108
1109 Easy as pie, right?
1110
1111 =cut
1112
1113 sub lookup {
1114         my ($what, $database, $input, $key, $having) = @_;
1115
1116         confess "lookup needs 5 arguments: what, database, input, key, having\n" unless ($#_ == 4);
1117
1118         warn "## lookup ($database, $input, $key)", $/ if ($debug > 1);
1119         return unless (defined($lookup->{$database}->{$input}->{$key}));
1120
1121         confess "lookup really need load_row_coderef added to data_structure\n" unless ($load_row_coderef);
1122
1123         my $mfns;
1124         my @having = $having->();
1125
1126         warn "## having = ", dump( @having ) if ($debug > 2);
1127
1128         foreach my $h ( @having ) {
1129                 if (defined($lookup->{$database}->{$input}->{$key}->{$h})) {
1130                         warn "lookup for $database/$input/$key/$h return ",dump($lookup->{$database}->{$input}->{$key}->{$h}),"\n" if ($debug);
1131                         $mfns->{$_}++ foreach keys %{ $lookup->{$database}->{$input}->{$key}->{$h} };
1132                 }
1133         }
1134
1135         return unless ($mfns);
1136
1137         my @mfns = sort keys %$mfns;
1138
1139         warn "# lookup loading $database/$input/$key mfn ", join(",",@mfns)," having ",dump(@having),"\n" if ($debug);
1140
1141         my $old_rec = $rec;
1142         my @out;
1143
1144         foreach my $mfn (@mfns) {
1145                 $rec = $load_row_coderef->( $database, $input, $mfn );
1146
1147                 warn "got $database/$input/$mfn = ", dump($rec), $/ if ($debug);
1148
1149                 my @vals = $what->();
1150
1151                 push @out, ( @vals );
1152
1153                 warn "lookup for mfn $mfn returned ", dump(@vals), $/ if ($debug);
1154         }
1155
1156 #       if (ref($lookup->{$k}) eq 'ARRAY') {
1157 #               return @{ $lookup->{$k} };
1158 #       } else {
1159 #               return $lookup->{$k};
1160 #       }
1161
1162         $rec = $old_rec;
1163
1164         warn "## lookup returns = ", dump(@out), $/ if ($debug);
1165
1166         if ($#out == 0) {
1167                 return $out[0];
1168         } else {
1169                 return @out;
1170         }
1171 }
1172
1173 =head2 save_into_lookup
1174
1175 Save value into lookup. It associates current database, input
1176 and specific keys with one or more values which will be
1177 associated over MFN.
1178
1179 MFN will be extracted from first occurence current of field 000
1180 in current record, or if it doesn't exist from L<_set_config> C<_mfn>.
1181
1182   my $nr = save_into_lookup($database,$input,$key,sub {
1183         # code which produce one or more values 
1184   });
1185
1186 It returns number of items saved.
1187
1188 This function shouldn't be called directly, it's called from code created by
1189 L<WebPAC::Parser>. 
1190
1191 =cut
1192
1193 sub save_into_lookup {
1194         my ($database,$input,$key,$coderef) = @_;
1195         die "save_into_lookup needs database" unless defined($database);
1196         die "save_into_lookup needs input" unless defined($input);
1197         die "save_into_lookup needs key" unless defined($key);
1198         die "save_into_lookup needs CODE" unless ( defined($coderef) && ref($coderef) eq 'CODE' );
1199
1200         warn "## save_into_lookup rec = ", dump($rec), " config = ", dump($config), $/ if ($debug > 2);
1201
1202         my $mfn = 
1203                 defined($rec->{'000'}->[0])     ?       $rec->{'000'}->[0]      :
1204                 defined($config->{_mfn})        ?       $config->{_mfn}         :
1205                                                                                 die "mfn not defined or zero";
1206
1207         my $nr = 0;
1208
1209         foreach my $v ( $coderef->() ) {
1210                 $lookup->{$database}->{$input}->{$key}->{$v}->{$mfn}++;
1211                 warn "# saved lookup $database/$input/$key [$v] $mfn\n" if ($debug > 1);
1212                 $nr++;
1213         }
1214
1215         return $nr;
1216 }
1217
1218 =head2 config
1219
1220 Consult config values stored in C<config.yml>
1221
1222   # return database code (key under databases in yaml)
1223   $database_code = config();    # use _ from hash
1224   $database_name = config('name');
1225   $database_input_name = config('input name');
1226   $tag = config('input normalize tag');
1227
1228 Up to three levels are supported.
1229
1230 =cut
1231
1232 sub config {
1233         return unless ($config);
1234
1235         my $p = shift;
1236
1237         $p ||= '';
1238
1239         my $v;
1240
1241         warn "### getting config($p)\n" if ($debug > 1);
1242
1243         my @p = split(/\s+/,$p);
1244         if ($#p < 0) {
1245                 $v = $config->{ '_' };  # special, database code
1246         } else {
1247
1248                 my $c = dclone( $config );
1249
1250                 foreach my $k (@p) {
1251                         warn "### k: $k c = ",dump($c),$/ if ($debug > 1);
1252                         if (ref($c) eq 'ARRAY') {
1253                                 $c = shift @$c;
1254                                 warn "config($p) taking first occurence of '$k', probably not what you wanted!\n";
1255                                 last;
1256                         }
1257
1258                         if (! defined($c->{$k}) ) {
1259                                 $c = undef;
1260                                 last;
1261                         } else {
1262                                 $c = $c->{$k};
1263                         }
1264                 }
1265                 $v = $c if ($c);
1266
1267         }
1268
1269         warn "## config( '$p' ) = ",dump( $v ),$/ if ($v && $debug);
1270         warn "config( '$p' ) is empty\n" if (! $v);
1271
1272         return $v;
1273 }
1274
1275 =head2 id
1276
1277 Returns unique id of this record
1278
1279   $id = id();
1280
1281 Returns C<42/2> for 2nd occurence of MFN 42.
1282
1283 =cut
1284
1285 sub id {
1286         my $mfn = $config->{_mfn} || die "no _mfn in config data";
1287         return $mfn . $#{$marc_record} ? $#{$marc_record} + 1 : '';
1288 }
1289
1290 =head2 join_with
1291
1292 Joins walues with some delimiter
1293
1294   $v = join_with(", ", @v);
1295
1296 =cut
1297
1298 sub join_with {
1299         my $d = shift;
1300         warn "### join_with('$d',",dump(@_),")\n" if ($debug > 2);
1301         my $v = join($d, grep { defined($_) && $_ ne '' } @_);
1302         return '' unless defined($v);
1303         return $v;
1304 }
1305
1306 =head2 split_rec_on
1307
1308 Split record subfield on some regex and take one of parts out
1309
1310   $a_before_semi_column =
1311         split_rec_on('200','a', /\s*;\s*/, $part);
1312
1313 C<$part> is optional number of element. First element is
1314 B<1>, not 0!
1315
1316 If there is no C<$part> parameter or C<$part> is 0, this function will
1317 return all values produced by splitting.
1318
1319 =cut
1320
1321 sub split_rec_on {
1322         die "split_rec_on need (fld,sf,regex[,part]" if ($#_ < 2);
1323
1324         my ($fld, $sf, $regex, $part) = @_;
1325         warn "### regex ", ref($regex), $regex, $/ if ($debug > 2);
1326
1327         my @r = rec( $fld, $sf );
1328         my $v = shift @r;
1329         warn "### first rec($fld,$sf) = ",dump($v),$/ if ($debug > 2);
1330
1331         return '' if ( ! defined($v) || $v =~ /^\s*$/);
1332
1333         my @s = split( $regex, $v );
1334         warn "## split_rec_on($fld,$sf,$regex,$part) = ",dump(@s),$/ if ($debug > 1);
1335         if ($part && $part > 0) {
1336                 return $s[ $part - 1 ];
1337         } else {
1338                 return @s;
1339         }
1340 }
1341
1342 my $hash;
1343
1344 =head2 set
1345
1346   set( key => 'value' );
1347
1348 =cut
1349
1350 sub set {
1351         my ($k,$v) = @_;
1352         warn "## set ( $k => ", dump($v), " )", $/ if ( $debug );
1353         $hash->{$k} = $v;
1354 };
1355
1356 =head2 get
1357
1358   get( 'key' );
1359
1360 =cut
1361
1362 sub get {
1363         my $k = shift || return;
1364         my $v = $hash->{$k};
1365         warn "## get $k = ", dump( $v ), $/ if ( $debug );
1366         return $v;
1367 }
1368
1369 =head2 count
1370
1371   if ( count( @result ) == 1 ) {
1372         # do something if only 1 result is there
1373   }
1374
1375 =cut
1376
1377 sub count {
1378         warn "## count ",dump(@_),$/ if ( $debug );
1379         return @_ . '';
1380 }
1381
1382 # END
1383 1;