1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
33 use Search::Elasticsearch;
37 use List::Util qw( sum0 reduce );
38 use Search::Elasticsearch;
41 use Encode qw(encode);
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
60 The name of the index to use, generally 'biblios' or 'authorities'.
70 my $self = $class->SUPER::new(@_);
71 # Check for a valid index
72 croak('No index name provided') unless $self->index;
76 =head2 get_elasticsearch
78 my $elasticsearch_client = $self->get_elasticsearch();
80 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
81 instance level and will be reused if method is called multiple times.
85 sub get_elasticsearch {
87 unless (defined $self->{elasticsearch}) {
88 my $conf = $self->get_elasticsearch_params();
89 $self->{elasticsearch} = Search::Elasticsearch->new(
90 client => "5_0::Direct",
91 nodes => $conf->{nodes},
96 return $self->{elasticsearch};
99 =head2 get_elasticsearch_params
101 my $params = $self->get_elasticsearch_params();
103 This provides a hashref that contains the parameters for connecting to the
104 ElasicSearch servers, in the form:
107 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
108 'index_name' => 'koha_instance_index',
111 This is configured by the following in the C<config> block in koha-conf.xml:
114 <server>127.0.0.1:9200</server>
115 <server>anotherserver:9200</server>
116 <index_name>koha_instance</index_name>
121 sub get_elasticsearch_params {
124 # Copy the hash so that we're not modifying the original
125 my $conf = C4::Context->config('elasticsearch');
126 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
127 my $es = { %{ $conf } };
129 # Helpfully, the multiple server lines end up in an array for us anyway
130 # if there are multiple ones, but not if there's only one.
131 my $server = $es->{server};
132 delete $es->{server};
133 if ( ref($server) eq 'ARRAY' ) {
135 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
136 $es->{nodes} = $server;
139 $es->{nodes} = [$server];
142 die "No elasticsearch servers were specified in koha-conf.xml.\n";
144 die "No elasticserver index_name was specified in koha-conf.xml.\n"
145 if ( !$es->{index_name} );
146 # Append the name of this particular index to our namespace
147 $es->{index_name} .= '_' . $self->index;
149 $es->{key_prefix} = 'es_';
153 =head2 get_elasticsearch_settings
155 my $settings = $self->get_elasticsearch_settings();
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
160 A hashref containing the settings is returned.
164 sub get_elasticsearch_settings {
167 # Use state to speed up repeated calls
168 state $settings = undef;
169 if (!defined $settings) {
170 my $config_file = C4::Context->config('elasticsearch_index_config');
171 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172 $settings = LoadFile( $config_file );
178 =head2 get_elasticsearch_mappings
180 my $mappings = $self->get_elasticsearch_mappings();
182 This provides the mappings that get passed to Elasticsearch when an index is
187 sub get_elasticsearch_mappings {
190 # Use state to speed up repeated calls
194 if (!defined $all_mappings{$self->index}) {
195 $sort_fields{$self->index} = {};
197 data => scalar _get_elasticsearch_mapping('general', '')
199 my $marcflavour = lc C4::Context->preference('marcflavour');
200 $self->_foreach_mapping(
202 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
203 return if $marc_type ne $marcflavour;
204 # TODO if this gets any sort of complexity to it, it should
205 # be broken out into its own function.
207 # TODO be aware of date formats, but this requires pre-parsing
208 # as ES will simply reject anything with an invalid date.
209 my $es_type = 'text';
210 if ($type eq 'boolean') {
211 $es_type = 'boolean';
212 } elsif ($type eq 'number' || $type eq 'sum') {
213 $es_type = 'integer';
214 } elsif ($type eq 'isbn' || $type eq 'stdno') {
218 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
221 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
224 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
226 # Sort is a bit special as it can be true, false, undef.
227 # We care about "true" or "undef",
228 # "undef" means to do the default thing, which is make it sortable.
229 if (!defined $sort || $sort) {
230 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
231 $sort_fields{$self->index}{$name} = 1;
235 $all_mappings{$self->index} = $mappings;
237 $self->sort_fields(\%{$sort_fields{$self->index}});
239 return $all_mappings{$self->index};
242 =head2 _get_elasticsearch_mapping
244 Get the Elasticsearch mappings for the given purpose and data type.
246 $mapping = _get_elasticsearch_mapping('search', 'text');
250 sub _get_elasticsearch_mapping {
252 my ( $purpose, $type ) = @_;
254 # Use state to speed up repeated calls
255 state $settings = undef;
256 if (!defined $settings) {
257 my $config_file = C4::Context->config('elasticsearch_field_config');
258 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
259 $settings = LoadFile( $config_file );
262 if (!defined $settings->{$purpose}) {
263 die "Field purpose $purpose not defined in field config";
266 return $settings->{$purpose};
268 if (defined $settings->{$purpose}{$type}) {
269 return $settings->{$purpose}{$type};
271 if (defined $settings->{$purpose}{'default'}) {
272 return $settings->{$purpose}{'default'};
277 sub reset_elasticsearch_mappings {
278 my ( $reset_fields ) = @_;
279 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
280 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
281 my $indexes = LoadFile( $mappings_yaml );
283 while ( my ( $index_name, $fields ) = each %$indexes ) {
284 while ( my ( $field_name, $data ) = each %$fields ) {
285 my $field_type = $data->{type};
286 my $field_label = $data->{label};
287 my $mappings = $data->{mappings};
288 my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
289 for my $mapping ( @$mappings ) {
290 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
291 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
297 # This overrides the accessor provided by Class::Accessor so that if
298 # sort_fields isn't set, then it'll generate it.
302 $self->_sort_fields_accessor(@_);
305 my $val = $self->_sort_fields_accessor();
308 # This will populate the accessor as a side effect
309 $self->get_elasticsearch_mappings();
310 return $self->_sort_fields_accessor();
313 =head2 _process_mappings($mappings, $data, $record_document)
315 Process all C<$mappings> targets operating on a specific MARC field C<$data> applied to C<$record_document>
316 Since we group all mappings by MARC field targets C<$mappings> will contain all targets for C<$data>
317 and thus we need to fetch the MARC field only once.
323 Arrayref of mappings containing arrayrefs on the format [C<$taget>, C<$options>] where
324 C<$target> is the name of the target field and C<$options> is a hashref containing processing
325 directives for this particular mapping.
329 The source data from a MARC record field.
331 =item C<$record_document>
333 Hashref representing the Elasticsearch document on which mappings should be applied.
339 sub _process_mappings {
340 my ($_self, $mappings, $data, $record_document) = @_;
341 foreach my $mapping (@{$mappings}) {
342 my ($target, $options) = @{$mapping};
343 # Copy (scalar) data since can have multiple targets
344 # with differing options for (possibly) mutating data
345 # so need a different copy for each
347 $record_document->{$target} //= [];
348 if (defined $options->{substr}) {
349 my ($start, $length) = @{$options->{substr}};
350 $_data = length($data) > $start ? substr $data, $start, $length : '';
352 if (defined $options->{value_callbacks}) {
353 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
355 if (defined $options->{property}) {
357 $options->{property} => $_data
360 push @{$record_document->{$target}}, $_data;
364 =head2 marc_records_to_documents($marc_records)
366 my @record_documents = $self->marc_records_to_documents($marc_records);
368 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
370 Returns array of hash references, representing Elasticsearch documents,
371 acceptable as body payload in C<Search::Elasticsearch> requests.
375 =item C<$marc_documents>
377 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
383 sub marc_records_to_documents {
384 my ($self, $records) = @_;
385 my $rules = $self->_get_marc_mapping_rules();
386 my $control_fields_rules = $rules->{control_fields};
387 my $data_fields_rules = $rules->{data_fields};
388 my $marcflavour = lc C4::Context->preference('marcflavour');
390 my @record_documents;
392 foreach my $record (@{$records}) {
393 my $record_document = {};
394 my $mappings = $rules->{leader};
396 $self->_process_mappings($mappings, $record->leader(), $record_document);
398 foreach my $field ($record->fields()) {
399 if($field->is_control_field()) {
400 my $mappings = $control_fields_rules->{$field->tag()};
402 $self->_process_mappings($mappings, $field->data(), $record_document);
406 my $data_field_rules = $data_fields_rules->{$field->tag()};
408 if ($data_field_rules) {
409 my $subfields_mappings = $data_field_rules->{subfields};
410 my $wildcard_mappings = $subfields_mappings->{'*'};
411 foreach my $subfield ($field->subfields()) {
412 my ($code, $data) = @{$subfield};
413 my $mappings = $subfields_mappings->{$code} // [];
414 if ($wildcard_mappings) {
415 $mappings = [@{$mappings}, @{$wildcard_mappings}];
418 $self->_process_mappings($mappings, $data, $record_document);
422 my $subfields_join_mappings = $data_field_rules->{subfields_join};
423 if ($subfields_join_mappings) {
424 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
425 # Map each subfield to values, remove empty values, join with space
430 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
434 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document);
441 foreach my $field (keys %{$rules->{defaults}}) {
442 unless (defined $record_document->{$field}) {
443 $record_document->{$field} = $rules->{defaults}->{$field};
446 foreach my $field (@{$rules->{sum}}) {
447 if (defined $record_document->{$field}) {
448 # TODO: validate numeric? filter?
449 # TODO: Or should only accept fields without nested values?
450 # TODO: Quick and dirty, improve if needed
451 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
454 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
455 $record->encoding('UTF-8');
458 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
459 local $SIG{__WARN__} = sub {
460 push @warnings, $_[0];
462 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
465 # Suppress warnings if record length exceeded
466 unless (substr($record->leader(), 0, 5) eq '99999') {
467 foreach my $warning (@warnings) {
471 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
472 $record_document->{'marc_format'} = 'MARCXML';
475 $record_document->{'marc_format'} = 'base64ISO2709';
477 my $id = $record->subfield('999', 'c');
478 push @record_documents, [$id, $record_document];
480 return \@record_documents;
483 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
485 Get mappings, an internal data structure later used by L<_process_mappings($mappings, $data, $record_document)>
486 to process MARC target data, for a MARC mapping.
488 The returned C<$mappings> is to to be confused with mappings provided by C<_foreach_mapping>, rather this
489 sub accepts properties from a mapping as provided by C<_foreach_mapping> and expands it to this internal
490 data stucture. In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings> is then
491 applied to each MARC target (leader, control field data, subfield or joined subfields) and
492 integrated into the mapping rules data structure used in C<marc_records_to_documents> to
493 transform MARC records into Elasticsearch documents.
499 Boolean indicating whether to create a facet field for this mapping.
501 =item C<$suggestible>
503 Boolean indicating whether to create a suggestion field for this mapping.
507 Boolean indicating whether to create a sort field for this mapping.
509 =item C<$target_name>
511 Elasticsearch document target field name.
513 =item C<$target_type>
515 Elasticsearch document target field type.
519 An optinal range as a string on the format "<START>-<END>" or "<START>",
520 where "<START>" and "<END>" are integers specifying a range that will be used
521 for extracting a substing from MARC data as Elasticsearch field target value.
523 The first character position is "1", and the range is inclusive,
524 so "1-3" means the first three characters of MARC data.
526 If only "<START>" is provided only one character as position "<START>" will
533 sub _field_mappings {
534 my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
535 my %mapping_defaults = ();
538 my $substr_args = undef;
540 # TODO: use value_callback instead?
541 my ($start, $end) = map(int, split /-/, $range, 2);
542 $substr_args = [$start];
543 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
545 my $default_options = {};
547 $default_options->{substr} = $substr_args;
550 # TODO: Should probably have per type value callback/hook
551 # but hard code for now
552 if ($target_type eq 'boolean') {
553 $default_options->{value_callbacks} //= [];
554 push @{$default_options->{value_callbacks}}, sub {
556 # Trim whitespace at both ends
557 $value =~ s/^\s+|\s+$//g;
558 return $value ? 'true' : 'false';
562 my $mapping = [$target_name, $default_options];
563 push @mappings, $mapping;
566 push @suffixes, 'facet' if $facet;
567 push @suffixes, 'suggestion' if $suggestible;
568 push @suffixes, 'sort' if !defined $sort || $sort;
570 foreach my $suffix (@suffixes) {
571 my $mapping = ["${target_name}__$suffix"];
572 # TODO: Hack, fix later in less hideous manner
573 if ($suffix eq 'suggestion') {
574 push @{$mapping}, {%{$default_options}, property => 'input'};
577 push @{$mapping}, $default_options;
579 push @mappings, $mapping;
584 =head2 _get_marc_mapping_rules
586 my $mapping_rules = $self->_get_marc_mapping_rules()
588 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
590 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
591 each call to C<MARC::Record>->field) we create an optimized structure of mapping
592 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
594 We can then iterate through all MARC fields for each record and apply all relevant
595 rules once per fields instead of retreiving fields multiple times for each mapping rule
596 wich is terribly slow.
600 # TODO: This structure can be used for processing multiple MARC::Records so is currently
601 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
602 # memory cache which it is currently not. The performance gain of caching
603 # would probably be marginal, but to do this could be a further improvement.
605 sub _get_marc_mapping_rules {
608 my $marcflavour = lc C4::Context->preference('marcflavour');
611 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
612 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
615 'control_fields' => {},
621 $self->_foreach_mapping(sub {
622 my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
623 return if $marc_type ne $marcflavour;
625 if ($type eq 'sum') {
626 push @{$rules->{sum}}, $name;
628 elsif($type eq 'boolean') {
629 # boolean gets special handling, if value doesn't exist for a field,
631 $rules->{defaults}->{$name} = 'false';
634 if ($marc_field =~ $field_spec_regexp) {
639 # Parse and separate subfields form subfield groups
641 my $subfield_group = '';
644 foreach my $token (split //, $2) {
647 die("Unmatched opening parenthesis for $marc_field");
653 elsif ($token eq ")") {
655 if ($subfield_group) {
656 push @subfield_groups, $subfield_group;
657 $subfield_group = '';
662 die("Unmatched closing parenthesis for $marc_field");
665 elsif ($open_group) {
666 $subfield_group .= $token;
669 push @subfields, $token;
674 push @subfields, '*';
677 my $range = defined $3 ? $3 : undef;
678 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
680 if ($field_tag < 10) {
681 $rules->{control_fields}->{$field_tag} //= [];
682 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
685 $rules->{data_fields}->{$field_tag} //= {};
686 foreach my $subfield (@subfields) {
687 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
688 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
690 foreach my $subfield_group (@subfield_groups) {
691 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
692 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
696 elsif ($marc_field =~ $leader_regexp) {
697 my $range = defined $1 ? $1 : undef;
698 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
699 push @{$rules->{leader}}, @mappings;
702 die("Invalid MARC field: $marc_field");
708 =head2 _foreach_mapping
710 $self->_foreach_mapping(
712 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
715 return unless $marc_type eq 'marc21';
716 print "Data comes from: " . $marc_field . "\n";
720 This allows you to apply a function to each entry in the elasticsearch mappings
721 table, in order to build the mappings for whatever is needed.
723 In the provided function, the files are:
729 The field name for elasticsearch (corresponds to the 'mapping' column in the
734 The type for this value, e.g. 'string'.
738 True if this value should be facetised. This only really makes sense if the
739 field is understood by the facet processing code anyway.
743 True if this is a field that a) needs special sort handling, and b) if it
744 should be sorted on. False if a) but not b). Undef if not a). This allows,
745 for example, author to be sorted on but not everything marked with "author"
746 to be included in that sort.
750 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
751 'unimarc', 'normarc'.
755 A string that describes the MARC field that contains the data to extract.
756 These are of a form suited to Catmandu's MARC fixers.
762 sub _foreach_mapping {
763 my ( $self, $sub ) = @_;
765 # TODO use a caching framework here
766 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
768 'search_marc_map.index_name' => $self->index,
770 { join => { search_marc_to_fields => 'search_marc_map' },
772 'search_marc_to_fields.facet',
773 'search_marc_to_fields.suggestible',
774 'search_marc_to_fields.sort',
775 'search_marc_map.marc_type',
776 'search_marc_map.marc_field',
788 while ( my $search_field = $search_fields->next ) {
792 $search_field->get_column('facet'),
793 $search_field->get_column('suggestible'),
794 $search_field->get_column('sort'),
795 $search_field->get_column('marc_type'),
796 $search_field->get_column('marc_field'),
803 die process_error($@);
805 This parses an Elasticsearch error message and produces a human-readable
806 result from it. This result is probably missing all the useful information
807 that you might want in diagnosing an issue, so the warning is also logged.
809 Note that currently the resulting message is not internationalised. This
810 will happen eventually by some method or other.
815 my ($self, $msg) = @_;
817 warn $msg; # simple logging
819 # This is super-primitive
820 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
822 return "Unable to perform your search. Please try again.\n";
825 =head2 _read_configuration
827 my $conf = _read_configuration();
829 Reads the I<configuration file> and returns a hash structure with the
830 configuration information. It raises an exception if mandatory entries
833 The hashref structure has the following form:
836 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
837 'index_name' => 'koha_instance',
840 This is configured by the following in the C<config> block in koha-conf.xml:
843 <server>127.0.0.1:9200</server>
844 <server>anotherserver:9200</server>
845 <index_name>koha_instance</index_name>
850 sub _read_configuration {
854 my $conf = C4::Context->config('elasticsearch');
855 Koha::Exceptions::Config::MissingEntry->throw(
856 "Missing 'elasticsearch' block in config file")
857 unless defined $conf;
859 if ( $conf && $conf->{server} ) {
860 my $nodes = $conf->{server};
861 if ( ref($nodes) eq 'ARRAY' ) {
862 $configuration->{nodes} = $nodes;
865 $configuration->{nodes} = [$nodes];
869 Koha::Exceptions::Config::MissingEntry->throw(
870 "Missing 'server' entry in config file for elasticsearch");
873 if ( defined $conf->{index_name} ) {
874 $configuration->{index_name} = $conf->{index_name};
877 Koha::Exceptions::Config::MissingEntry->throw(
878 "Missing 'index_name' entry in config file for elasticsearch");
881 return $configuration;
892 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
894 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
896 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>