Bug 22246: Fix indexing of large fields with Elasticsearch
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
index 04ec0d9..5cd0ad9 100644 (file)
@@ -34,6 +34,12 @@ use Search::Elasticsearch;
 use Try::Tiny;
 use YAML::Syck;
 
+use List::Util qw( sum0 reduce );
+use MARC::File::XML;
+use MIME::Base64;
+use Encode qw(encode);
+use Business::ISBN;
+
 __PACKAGE__->mk_ro_accessors(qw( index ));
 __PACKAGE__->mk_accessors(qw( sort_fields ));
 
@@ -63,10 +69,33 @@ sub new {
     my $class = shift @_;
     my $self = $class->SUPER::new(@_);
     # Check for a valid index
-    croak('No index name provided') unless $self->index;
+    Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
     return $self;
 }
 
+=head2 get_elasticsearch
+
+    my $elasticsearch_client = $self->get_elasticsearch();
+
+Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
+instance level and will be reused if method is called multiple times.
+
+=cut
+
+sub get_elasticsearch {
+    my $self = shift @_;
+    unless (defined $self->{elasticsearch}) {
+        my $conf = $self->get_elasticsearch_params();
+        $self->{elasticsearch} = Search::Elasticsearch->new(
+            client => "5_0::Direct",
+            nodes => $conf->{nodes},
+            cxn_pool => 'Sniff',
+            request_timeout => 60
+        );
+    }
+    return $self->{elasticsearch};
+}
+
 =head2 get_elasticsearch_params
 
     my $params = $self->get_elasticsearch_params();
@@ -125,8 +154,8 @@ sub get_elasticsearch_params {
 
     my $settings = $self->get_elasticsearch_settings();
 
-This provides the settings provided to elasticsearch when an index is created.
-These can do things like define tokenisation methods.
+This provides the settings provided to Elasticsearch when an index is created.
+These can do things like define tokenization methods.
 
 A hashref containing the settings is returned.
 
@@ -150,7 +179,7 @@ sub get_elasticsearch_settings {
 
     my $mappings = $self->get_elasticsearch_mappings();
 
-This provides the mappings that get passed to elasticsearch when an index is
+This provides the mappings that get passed to Elasticsearch when an index is
 created.
 
 =cut
@@ -165,7 +194,7 @@ sub get_elasticsearch_mappings {
     if (!defined $all_mappings{$self->index}) {
         $sort_fields{$self->index} = {};
         my $mappings = {
-            data => _get_elasticsearch_mapping('general', '')
+            data => scalar _get_elasticsearch_mapping('general', '')
         };
         my $marcflavour = lc C4::Context->preference('marcflavour');
         $self->_foreach_mapping(
@@ -212,7 +241,7 @@ sub get_elasticsearch_mappings {
 
 =head2 _get_elasticsearch_mapping
 
-Get the ES mappings for the given purpose and data type
+Get the Elasticsearch mappings for the given purpose and data type.
 
 $mapping = _get_elasticsearch_mapping('search', 'text');
 
@@ -242,7 +271,7 @@ sub _get_elasticsearch_mapping {
     if (defined $settings->{$purpose}{'default'}) {
         return $settings->{$purpose}{'default'};
     }
-    return undef;
+    return;
 }
 
 sub reset_elasticsearch_mappings {
@@ -281,48 +310,481 @@ sub sort_fields {
     return $self->_sort_fields_accessor();
 }
 
-# Provides the rules for data conversion.
-sub get_fixer_rules {
-    my ($self) = @_;
+=head2 _process_mappings($mappings, $data, $record_document, $altscript)
 
-    my $marcflavour = lc C4::Context->preference('marcflavour');
-    my @rules;
+    $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
 
-    $self->_foreach_mapping(
-        sub {
-            my ( $name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field ) = @_;
-            return if $marc_type ne $marcflavour;
-            my $options ='';
+Process all C<$mappings> targets operating on a specific MARC field C<$data>.
+Since we group all mappings by MARC field targets C<$mappings> will contain
+all targets for C<$data> and thus we need to fetch the MARC field only once.
+C<$mappings> will be applied to C<$record_document> and new field values added.
+The method has no return value.
+
+=over 4
+
+=item C<$mappings>
+
+Arrayref of mappings containing arrayrefs in the format
+[C<$target>, C<$options>] where C<$target> is the name of the target field and
+C<$options> is a hashref containing processing directives for this particular
+mapping.
 
-            push @rules, "marc_map('$marc_field','${name}.\$append', $options)";
-            if ($facet) {
-                push @rules, "marc_map('$marc_field','${name}__facet.\$append', $options)";
+=item C<$data>
+
+The source data from a MARC record field.
+
+=item C<$record_document>
+
+Hashref representing the Elasticsearch document on which mappings should be
+applied.
+
+=item C<$altscript>
+
+A boolean value indicating whether an alternate script presentation is being
+processed.
+
+=back
+
+=cut
+
+sub _process_mappings {
+    my ($_self, $mappings, $data, $record_document, $altscript) = @_;
+    foreach my $mapping (@{$mappings}) {
+        my ($target, $options) = @{$mapping};
+
+        # Don't process sort fields for alternate scripts
+        my $sort = $target =~ /__sort$/;
+        if ($sort && $altscript) {
+            next;
+        }
+
+        # Copy (scalar) data since can have multiple targets
+        # with differing options for (possibly) mutating data
+        # so need a different copy for each
+        my $_data = $data;
+        $record_document->{$target} //= [];
+        if (defined $options->{substr}) {
+            my ($start, $length) = @{$options->{substr}};
+            $_data = length($data) > $start ? substr $data, $start, $length : '';
+        }
+        if (defined $options->{value_callbacks}) {
+            $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
+        }
+        if (defined $options->{property}) {
+            $_data = {
+                $options->{property} => $_data
             }
-            if ($suggestible) {
-                push @rules,
-                    #"marc_map('$marc_field','${name}__suggestion.input.\$append', '')"; #must not have nested data structures in .input
-                    "marc_map('$marc_field','${name}__suggestion.input.\$append')";
+        }
+        push @{$record_document->{$target}}, $_data;
+    }
+}
+
+=head2 marc_records_to_documents($marc_records)
+
+    my @record_documents = $self->marc_records_to_documents($marc_records);
+
+Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
+
+Returns array of hash references, representing Elasticsearch documents,
+acceptable as body payload in C<Search::Elasticsearch> requests.
+
+=over 4
+
+=item C<$marc_documents>
+
+Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
+
+=back
+
+=cut
+
+sub marc_records_to_documents {
+    my ($self, $records) = @_;
+    my $rules = $self->_get_marc_mapping_rules();
+    my $control_fields_rules = $rules->{control_fields};
+    my $data_fields_rules = $rules->{data_fields};
+    my $marcflavour = lc C4::Context->preference('marcflavour');
+
+    my @record_documents;
+
+    foreach my $record (@{$records}) {
+        my $record_document = {};
+        my $mappings = $rules->{leader};
+        if ($mappings) {
+            $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
+        }
+        foreach my $field ($record->fields()) {
+            if ($field->is_control_field()) {
+                my $mappings = $control_fields_rules->{$field->tag()};
+                if ($mappings) {
+                    $self->_process_mappings($mappings, $field->data(), $record_document, 0);
+                }
             }
-            if ( $type eq 'boolean' ) {
+            else {
+                my $tag = $field->tag();
+                # Handle alternate scripts in MARC 21
+                my $altscript = 0;
+                if ($marcflavour eq 'marc21' && $tag eq '880') {
+                    my $sub6 = $field->subfield('6');
+                    if ($sub6 =~ /^(...)-\d+/) {
+                        $tag = $1;
+                        $altscript = 1;
+                    }
+                }
 
-                # boolean gets special handling, basically if it doesn't exist,
-                # it's added and set to false. Otherwise we can't query it.
-                push @rules,
-                  "unless exists('$name') add_field('$name', 0) end";
+                my $data_field_rules = $data_fields_rules->{$tag};
+
+                if ($data_field_rules) {
+                    my $subfields_mappings = $data_field_rules->{subfields};
+                    my $wildcard_mappings = $subfields_mappings->{'*'};
+                    foreach my $subfield ($field->subfields()) {
+                        my ($code, $data) = @{$subfield};
+                        my $mappings = $subfields_mappings->{$code} // [];
+                        if ($wildcard_mappings) {
+                            $mappings = [@{$mappings}, @{$wildcard_mappings}];
+                        }
+                        if (@{$mappings}) {
+                            $self->_process_mappings($mappings, $data, $record_document, $altscript);
+                        }
+                    }
+
+                    my $subfields_join_mappings = $data_field_rules->{subfields_join};
+                    if ($subfields_join_mappings) {
+                        foreach my $subfields_group (keys %{$subfields_join_mappings}) {
+                            # Map each subfield to values, remove empty values, join with space
+                            my $data = join(
+                                ' ',
+                                grep(
+                                    $_,
+                                    map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
+                                )
+                            );
+                            if ($data) {
+                                $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
+                            }
+                        }
+                    }
+                }
             }
-            if ($type eq 'sum' ) {
-                push @rules, "sum('$name')";
+        }
+        foreach my $field (keys %{$rules->{defaults}}) {
+            unless (defined $record_document->{$field}) {
+                $record_document->{$field} = $rules->{defaults}->{$field};
+            }
+        }
+        foreach my $field (@{$rules->{sum}}) {
+            if (defined $record_document->{$field}) {
+                # TODO: validate numeric? filter?
+                # TODO: Or should only accept fields without nested values?
+                # TODO: Quick and dirty, improve if needed
+                $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
             }
-            if ($self->sort_fields()->{$name}) {
-                if ($sort || !defined $sort) {
-                    push @rules, "marc_map('$marc_field','${name}__sort.\$append', $options)";
+        }
+        # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
+        foreach my $field (@{$rules->{isbn}}) {
+            if (defined $record_document->{$field}) {
+                my @isbns = ();
+                foreach my $input_isbn (@{$record_document->{$field}}) {
+                    my $isbn = Business::ISBN->new($input_isbn);
+                    if (defined $isbn && $isbn->is_valid) {
+                        my $isbn13 = $isbn->as_isbn13->as_string;
+                        push @isbns, $isbn13;
+                        $isbn13 =~ s/\-//g;
+                        push @isbns, $isbn13;
+
+                        my $isbn10 = $isbn->as_isbn10;
+                        if ($isbn10) {
+                            $isbn10 = $isbn10->as_string;
+                            push @isbns, $isbn10;
+                            $isbn10 =~ s/\-//g;
+                            push @isbns, $isbn10;
+                        }
+                    } else {
+                        push @isbns, $input_isbn;
+                    }
                 }
+                $record_document->{$field} = \@isbns;
             }
         }
-    );
 
-    push @rules, "move_field(_id,es_id)"; #Also you must set the Catmandu::Store::ElasticSearch->new(key_prefix: 'es_');
-    return \@rules;
+        # Remove duplicate values and collapse sort fields
+        foreach my $field (keys %{$record_document}) {
+            if (ref($record_document->{$field}) eq 'ARRAY') {
+                @{$record_document->{$field}} = do {
+                    my %seen;
+                    grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
+                };
+                if ($field =~ /__sort$/) {
+                    # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
+                    $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
+                }
+            }
+        }
+
+        # TODO: Perhaps should check if $records_document non empty, but really should never be the case
+        $record->encoding('UTF-8');
+        my @warnings;
+        {
+            # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
+            local $SIG{__WARN__} = sub {
+                push @warnings, $_[0];
+            };
+            $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
+        }
+        if (@warnings) {
+            # Suppress warnings if record length exceeded
+            unless (substr($record->leader(), 0, 5) eq '99999') {
+                foreach my $warning (@warnings) {
+                    carp $warning;
+                }
+            }
+            $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
+            $record_document->{'marc_format'} = 'MARCXML';
+        }
+        else {
+            $record_document->{'marc_format'} = 'base64ISO2709';
+        }
+        my $id = $record->subfield('999', 'c');
+        push @record_documents, [$id, $record_document];
+    }
+    return \@record_documents;
+}
+
+=head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
+
+    my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
+
+Get mappings, an internal data structure later used by
+L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
+data for a MARC mapping.
+
+The returned C<$mappings> is not to to be confused with mappings provided by
+C<_foreach_mapping>, rather this sub accepts properties from a mapping as
+provided by C<_foreach_mapping> and expands it to this internal data structure.
+In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
+is then applied to each MARC target (leader, control field data, subfield or
+joined subfields) and integrated into the mapping rules data structure used in
+C<marc_records_to_documents> to transform MARC records into Elasticsearch
+documents.
+
+=over 4
+
+=item C<$facet>
+
+Boolean indicating whether to create a facet field for this mapping.
+
+=item C<$suggestible>
+
+Boolean indicating whether to create a suggestion field for this mapping.
+
+=item C<$sort>
+
+Boolean indicating whether to create a sort field for this mapping.
+
+=item C<$target_name>
+
+Elasticsearch document target field name.
+
+=item C<$target_type>
+
+Elasticsearch document target field type.
+
+=item C<$range>
+
+An optional range as a string in the format "<START>-<END>" or "<START>",
+where "<START>" and "<END>" are integers specifying a range that will be used
+for extracting a substring from MARC data as Elasticsearch field target value.
+
+The first character position is "1", and the range is inclusive,
+so "1-3" means the first three characters of MARC data.
+
+If only "<START>" is provided only one character at position "<START>" will
+be extracted.
+
+=back
+
+=cut
+
+sub _field_mappings {
+    my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
+    my %mapping_defaults = ();
+    my @mappings;
+
+    my $substr_args = undef;
+    if ($range) {
+        # TODO: use value_callback instead?
+        my ($start, $end) = map(int, split /-/, $range, 2);
+        $substr_args = [$start];
+        push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
+    }
+    my $default_options = {};
+    if ($substr_args) {
+        $default_options->{substr} = $substr_args;
+    }
+
+    # TODO: Should probably have per type value callback/hook
+    # but hard code for now
+    if ($target_type eq 'boolean') {
+        $default_options->{value_callbacks} //= [];
+        push @{$default_options->{value_callbacks}}, sub {
+            my ($value) = @_;
+            # Trim whitespace at both ends
+            $value =~ s/^\s+|\s+$//g;
+            return $value ? 'true' : 'false';
+        };
+    }
+
+    my $mapping = [$target_name, $default_options];
+    push @mappings, $mapping;
+
+    my @suffixes = ();
+    push @suffixes, 'facet' if $facet;
+    push @suffixes, 'suggestion' if $suggestible;
+    push @suffixes, 'sort' if !defined $sort || $sort;
+
+    foreach my $suffix (@suffixes) {
+        my $mapping = ["${target_name}__$suffix"];
+        # TODO: Hack, fix later in less hideous manner
+        if ($suffix eq 'suggestion') {
+            push @{$mapping}, {%{$default_options}, property => 'input'};
+        }
+        else {
+            push @{$mapping}, $default_options;
+        }
+        push @mappings, $mapping;
+    }
+    return @mappings;
+};
+
+=head2 _get_marc_mapping_rules
+
+    my $mapping_rules = $self->_get_marc_mapping_rules()
+
+Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
+
+Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
+each call to C<MARC::Record>->field) we create an optimized structure of mapping
+rules keyed by MARC field tags holding all the mapping rules for that particular tag.
+
+We can then iterate through all MARC fields for each record and apply all relevant
+rules once per fields instead of retreiving fields multiple times for each mapping rule
+which is terribly slow.
+
+=cut
+
+# TODO: This structure can be used for processing multiple MARC::Records so is currently
+# rebuilt for each batch. Since it is cacheable it could also be stored in an in
+# memory cache which it is currently not. The performance gain of caching
+# would probably be marginal, but to do this could be a further improvement.
+
+sub _get_marc_mapping_rules {
+    my ($self) = @_;
+    my $marcflavour = lc C4::Context->preference('marcflavour');
+    my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
+    my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
+    my $rules = {
+        'leader' => [],
+        'control_fields' => {},
+        'data_fields' => {},
+        'sum' => [],
+        'isbn' => [],
+        'defaults' => {}
+    };
+
+    $self->_foreach_mapping(sub {
+        my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
+        return if $marc_type ne $marcflavour;
+
+        if ($type eq 'sum') {
+            push @{$rules->{sum}}, $name;
+        }
+        elsif ($type eq 'isbn') {
+            push @{$rules->{isbn}}, $name;
+        }
+        elsif ($type eq 'boolean') {
+            # boolean gets special handling, if value doesn't exist for a field,
+            # it is set to false
+            $rules->{defaults}->{$name} = 'false';
+        }
+
+        if ($marc_field =~ $field_spec_regexp) {
+            my $field_tag = $1;
+
+            my @subfields;
+            my @subfield_groups;
+            # Parse and separate subfields form subfield groups
+            if (defined $2) {
+                my $subfield_group = '';
+                my $open_group = 0;
+
+                foreach my $token (split //, $2) {
+                    if ($token eq "(") {
+                        if ($open_group) {
+                            Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
+                                "Unmatched opening parenthesis for $marc_field"
+                            );
+                        }
+                        else {
+                            $open_group = 1;
+                        }
+                    }
+                    elsif ($token eq ")") {
+                        if ($open_group) {
+                            if ($subfield_group) {
+                                push @subfield_groups, $subfield_group;
+                                $subfield_group = '';
+                            }
+                            $open_group = 0;
+                        }
+                        else {
+                            Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
+                                "Unmatched closing parenthesis for $marc_field"
+                            );
+                        }
+                    }
+                    elsif ($open_group) {
+                        $subfield_group .= $token;
+                    }
+                    else {
+                        push @subfields, $token;
+                    }
+                }
+            }
+            else {
+                push @subfields, '*';
+            }
+
+            my $range = defined $3 ? $3 : undef;
+            my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
+
+            if ($field_tag < 10) {
+                $rules->{control_fields}->{$field_tag} //= [];
+                push @{$rules->{control_fields}->{$field_tag}}, @mappings;
+            }
+            else {
+                $rules->{data_fields}->{$field_tag} //= {};
+                foreach my $subfield (@subfields) {
+                    $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
+                    push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
+                }
+                foreach my $subfield_group (@subfield_groups) {
+                    $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
+                    push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
+                }
+            }
+        }
+        elsif ($marc_field =~ $leader_regexp) {
+            my $range = defined $1 ? $1 : undef;
+            my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
+            push @{$rules->{leader}}, @mappings;
+        }
+        else {
+            Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
+                "Invalid MARC field expression: $marc_field"
+            );
+        }
+    });
+    return $rules;
 }
 
 =head2 _foreach_mapping
@@ -407,7 +869,9 @@ sub _foreach_mapping {
 
     while ( my $search_field = $search_fields->next ) {
         $sub->(
-            $search_field->name,
+            # Force lower case on indexed field names for case insensitive
+            # field name searches
+            lc($search_field->name),
             $search_field->type,
             $search_field->get_column('facet'),
             $search_field->get_column('suggestible'),