1c5fc29d4be45eb72486eb5341d154e9c79f3587
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use Search::Elasticsearch;
39 use MARC::File::XML;
40 use MIME::Base64;
41 use Encode qw(encode);
42
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
45
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX     => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
49
50 =head1 NAME
51
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
53
54 =head1 ACCESSORS
55
56 =over 4
57
58 =item index
59
60 The name of the index to use, generally 'biblios' or 'authorities'.
61
62 =back
63
64 =head1 FUNCTIONS
65
66 =cut
67
68 sub new {
69     my $class = shift @_;
70     my $self = $class->SUPER::new(@_);
71     # Check for a valid index
72     croak('No index name provided') unless $self->index;
73     return $self;
74 }
75
76 =head2 get_elasticsearch
77
78     my $elasticsearch_client = $self->get_elasticsearch();
79
80 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
81 instance level and will be reused if method is called multiple times.
82
83 =cut
84
85 sub get_elasticsearch {
86     my $self = shift @_;
87     unless (defined $self->{elasticsearch}) {
88         my $conf = $self->get_elasticsearch_params();
89         $self->{elasticsearch} = Search::Elasticsearch->new(
90             client => "5_0::Direct",
91             nodes => $conf->{nodes},
92             cxn_pool => 'Sniff',
93             request_timeout => 60
94         );
95     }
96     return $self->{elasticsearch};
97 }
98
99 =head2 get_elasticsearch_params
100
101     my $params = $self->get_elasticsearch_params();
102
103 This provides a hashref that contains the parameters for connecting to the
104 ElasicSearch servers, in the form:
105
106     {
107         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
108         'index_name' => 'koha_instance_index',
109     }
110
111 This is configured by the following in the C<config> block in koha-conf.xml:
112
113     <elasticsearch>
114         <server>127.0.0.1:9200</server>
115         <server>anotherserver:9200</server>
116         <index_name>koha_instance</index_name>
117     </elasticsearch>
118
119 =cut
120
121 sub get_elasticsearch_params {
122     my ($self) = @_;
123
124     # Copy the hash so that we're not modifying the original
125     my $conf = C4::Context->config('elasticsearch');
126     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
127     my $es = { %{ $conf } };
128
129     # Helpfully, the multiple server lines end up in an array for us anyway
130     # if there are multiple ones, but not if there's only one.
131     my $server = $es->{server};
132     delete $es->{server};
133     if ( ref($server) eq 'ARRAY' ) {
134
135         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
136         $es->{nodes} = $server;
137     }
138     elsif ($server) {
139         $es->{nodes} = [$server];
140     }
141     else {
142         die "No elasticsearch servers were specified in koha-conf.xml.\n";
143     }
144     die "No elasticserver index_name was specified in koha-conf.xml.\n"
145       if ( !$es->{index_name} );
146     # Append the name of this particular index to our namespace
147     $es->{index_name} .= '_' . $self->index;
148
149     $es->{key_prefix} = 'es_';
150     return $es;
151 }
152
153 =head2 get_elasticsearch_settings
154
155     my $settings = $self->get_elasticsearch_settings();
156
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
159
160 A hashref containing the settings is returned.
161
162 =cut
163
164 sub get_elasticsearch_settings {
165     my ($self) = @_;
166
167     # Use state to speed up repeated calls
168     state $settings = undef;
169     if (!defined $settings) {
170         my $config_file = C4::Context->config('elasticsearch_index_config');
171         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172         $settings = LoadFile( $config_file );
173     }
174
175     return $settings;
176 }
177
178 =head2 get_elasticsearch_mappings
179
180     my $mappings = $self->get_elasticsearch_mappings();
181
182 This provides the mappings that get passed to Elasticsearch when an index is
183 created.
184
185 =cut
186
187 sub get_elasticsearch_mappings {
188     my ($self) = @_;
189
190     # Use state to speed up repeated calls
191     state %all_mappings;
192     state %sort_fields;
193
194     if (!defined $all_mappings{$self->index}) {
195         $sort_fields{$self->index} = {};
196         my $mappings = {
197             data => scalar _get_elasticsearch_mapping('general', '')
198         };
199         my $marcflavour = lc C4::Context->preference('marcflavour');
200         $self->_foreach_mapping(
201             sub {
202                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
203                 return if $marc_type ne $marcflavour;
204                 # TODO if this gets any sort of complexity to it, it should
205                 # be broken out into its own function.
206
207                 # TODO be aware of date formats, but this requires pre-parsing
208                 # as ES will simply reject anything with an invalid date.
209                 my $es_type = 'text';
210                 if ($type eq 'boolean') {
211                     $es_type = 'boolean';
212                 } elsif ($type eq 'number' || $type eq 'sum') {
213                     $es_type = 'integer';
214                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
215                     $es_type = 'stdno';
216                 }
217
218                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $all_mappings{$self->index} = $mappings;
236     }
237     $self->sort_fields(\%{$sort_fields{$self->index}});
238
239     return $all_mappings{$self->index};
240 }
241
242 =head2 _get_elasticsearch_mapping
243
244 Get the Elasticsearch mappings for the given purpose and data type.
245
246 $mapping = _get_elasticsearch_mapping('search', 'text');
247
248 =cut
249
250 sub _get_elasticsearch_mapping {
251
252     my ( $purpose, $type ) = @_;
253
254     # Use state to speed up repeated calls
255     state $settings = undef;
256     if (!defined $settings) {
257         my $config_file = C4::Context->config('elasticsearch_field_config');
258         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
259         $settings = LoadFile( $config_file );
260     }
261
262     if (!defined $settings->{$purpose}) {
263         die "Field purpose $purpose not defined in field config";
264     }
265     if ($type eq '') {
266         return $settings->{$purpose};
267     }
268     if (defined $settings->{$purpose}{$type}) {
269         return $settings->{$purpose}{$type};
270     }
271     if (defined $settings->{$purpose}{'default'}) {
272         return $settings->{$purpose}{'default'};
273     }
274     return;
275 }
276
277 sub reset_elasticsearch_mappings {
278     my ( $reset_fields ) = @_;
279     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
280     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
281     my $indexes = LoadFile( $mappings_yaml );
282
283     while ( my ( $index_name, $fields ) = each %$indexes ) {
284         while ( my ( $field_name, $data ) = each %$fields ) {
285             my $field_type = $data->{type};
286             my $field_label = $data->{label};
287             my $mappings = $data->{mappings};
288             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
289             for my $mapping ( @$mappings ) {
290                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
291                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
292             }
293         }
294     }
295 }
296
297 # This overrides the accessor provided by Class::Accessor so that if
298 # sort_fields isn't set, then it'll generate it.
299 sub sort_fields {
300     my $self = shift;
301     if (@_) {
302         $self->_sort_fields_accessor(@_);
303         return;
304     }
305     my $val = $self->_sort_fields_accessor();
306     return $val if $val;
307
308     # This will populate the accessor as a side effect
309     $self->get_elasticsearch_mappings();
310     return $self->_sort_fields_accessor();
311 }
312
313 =head2 _process_mappings($mappings, $data, $record_document)
314
315 Process all C<$mappings> targets operating on a specific MARC field C<$data> applied to C<$record_document>
316 Since we group all mappings by MARC field targets C<$mappings> will contain all targets for C<$data>
317 and thus we need to fetch the MARC field only once.
318
319 =over 4
320
321 =item C<$mappings>
322
323 Arrayref of mappings containing arrayrefs on the format [C<$taget>, C<$options>] where
324 C<$target> is the name of the target field and C<$options> is a hashref containing processing
325 directives for this particular mapping.
326
327 =item C<$data>
328
329 The source data from a MARC record field.
330
331 =item C<$record_document>
332
333 Hashref representing the  Elasticsearch document on which mappings should be applied.
334
335 =back
336
337 =cut
338
339 sub _process_mappings {
340     my ($_self, $mappings, $data, $record_document) = @_;
341     foreach my $mapping (@{$mappings}) {
342         my ($target, $options) = @{$mapping};
343         # Copy (scalar) data since can have multiple targets
344         # with differing options for (possibly) mutating data
345         # so need a different copy for each
346         my $_data = $data;
347         $record_document->{$target} //= [];
348         if (defined $options->{substr}) {
349             my ($start, $length) = @{$options->{substr}};
350             $_data = length($data) > $start ? substr $data, $start, $length : '';
351         }
352         if (defined $options->{value_callbacks}) {
353             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
354         }
355         if (defined $options->{property}) {
356             $_data = {
357                 $options->{property} => $_data
358             }
359         }
360         push @{$record_document->{$target}}, $_data;
361     }
362 }
363
364 =head2 marc_records_to_documents($marc_records)
365
366     my @record_documents = $self->marc_records_to_documents($marc_records);
367
368 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
369
370 Returns array of hash references, representing Elasticsearch documents,
371 acceptable as body payload in C<Search::Elasticsearch> requests.
372
373 =over 4
374
375 =item C<$marc_documents>
376
377 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
378
379 =back
380
381 =cut
382
383 sub marc_records_to_documents {
384     my ($self, $records) = @_;
385     my $rules = $self->_get_marc_mapping_rules();
386     my $control_fields_rules = $rules->{control_fields};
387     my $data_fields_rules = $rules->{data_fields};
388     my $marcflavour = lc C4::Context->preference('marcflavour');
389
390     my @record_documents;
391
392     foreach my $record (@{$records}) {
393         my $record_document = {};
394         my $mappings = $rules->{leader};
395         if ($mappings) {
396             $self->_process_mappings($mappings, $record->leader(), $record_document);
397         }
398         foreach my $field ($record->fields()) {
399             if($field->is_control_field()) {
400                 my $mappings = $control_fields_rules->{$field->tag()};
401                 if ($mappings) {
402                     $self->_process_mappings($mappings, $field->data(), $record_document);
403                 }
404             }
405             else {
406                 my $data_field_rules = $data_fields_rules->{$field->tag()};
407
408                 if ($data_field_rules) {
409                     my $subfields_mappings = $data_field_rules->{subfields};
410                     my $wildcard_mappings = $subfields_mappings->{'*'};
411                     foreach my $subfield ($field->subfields()) {
412                         my ($code, $data) = @{$subfield};
413                         my $mappings = $subfields_mappings->{$code} // [];
414                         if ($wildcard_mappings) {
415                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
416                         }
417                         if (@{$mappings}) {
418                             $self->_process_mappings($mappings, $data, $record_document);
419                         }
420                     }
421
422                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
423                     if ($subfields_join_mappings) {
424                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
425                             # Map each subfield to values, remove empty values, join with space
426                             my $data = join(
427                                 ' ',
428                                 grep(
429                                     $_,
430                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
431                                 )
432                             );
433                             if ($data) {
434                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document);
435                             }
436                         }
437                     }
438                 }
439             }
440         }
441         foreach my $field (keys %{$rules->{defaults}}) {
442             unless (defined $record_document->{$field}) {
443                 $record_document->{$field} = $rules->{defaults}->{$field};
444             }
445         }
446         foreach my $field (@{$rules->{sum}}) {
447             if (defined $record_document->{$field}) {
448                 # TODO: validate numeric? filter?
449                 # TODO: Or should only accept fields without nested values?
450                 # TODO: Quick and dirty, improve if needed
451                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
452             }
453         }
454         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
455         $record->encoding('UTF-8');
456         my @warnings;
457         {
458             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
459             local $SIG{__WARN__} = sub {
460                 push @warnings, $_[0];
461             };
462             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
463         }
464         if (@warnings) {
465             # Suppress warnings if record length exceeded
466             unless (substr($record->leader(), 0, 5) eq '99999') {
467                 foreach my $warning (@warnings) {
468                     carp($warning);
469                 }
470             }
471             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
472             $record_document->{'marc_format'} = 'MARCXML';
473         }
474         else {
475             $record_document->{'marc_format'} = 'base64ISO2709';
476         }
477         my $id = $record->subfield('999', 'c');
478         push @record_documents, [$id, $record_document];
479     }
480     return \@record_documents;
481 }
482
483 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
484
485 Get mappings, an internal data structure later used by L<_process_mappings($mappings, $data, $record_document)>
486 to process MARC target data, for a MARC mapping.
487
488 The returned C<$mappings> is to to be confused  with mappings provided by C<_foreach_mapping>, rather this
489 sub accepts properties from a mapping as provided by C<_foreach_mapping> and expands it to this internal
490 data stucture. In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings> is then
491 applied to each MARC target (leader, control field data, subfield or joined subfields) and
492 integrated into the mapping rules data structure used in C<marc_records_to_documents> to
493 transform MARC records into Elasticsearch documents.
494
495 =over 4
496
497 =item C<$facet>
498
499 Boolean indicating whether to create a facet field for this mapping.
500
501 =item C<$suggestible>
502
503 Boolean indicating whether to create a suggestion field for this mapping.
504
505 =item C<$sort>
506
507 Boolean indicating whether to create a sort field for this mapping.
508
509 =item C<$target_name>
510
511 Elasticsearch document target field name.
512
513 =item C<$target_type>
514
515 Elasticsearch document target field type.
516
517 =item C<$range>
518
519 An optinal range as a string on the format "<START>-<END>" or "<START>",
520 where "<START>" and "<END>" are integers specifying a range that will be used
521 for extracting a substing from MARC data as Elasticsearch field target value.
522
523 The first character position is "1", and the range is inclusive,
524 so "1-3" means the first three characters of MARC data.
525
526 If only "<START>" is provided only one character as position "<START>" will
527 be extracted.
528
529 =back
530
531 =cut
532
533 sub _field_mappings {
534     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
535     my %mapping_defaults = ();
536     my @mappings;
537
538     my $substr_args = undef;
539     if ($range) {
540         # TODO: use value_callback instead?
541         my ($start, $end) = map(int, split /-/, $range, 2);
542         $substr_args = [$start];
543         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
544     }
545     my $default_options = {};
546     if ($substr_args) {
547         $default_options->{substr} = $substr_args;
548     }
549
550     # TODO: Should probably have per type value callback/hook
551     # but hard code for now
552     if ($target_type eq 'boolean') {
553         $default_options->{value_callbacks} //= [];
554         push @{$default_options->{value_callbacks}}, sub {
555             my ($value) = @_;
556             # Trim whitespace at both ends
557             $value =~ s/^\s+|\s+$//g;
558             return $value ? 'true' : 'false';
559         };
560     }
561
562     my $mapping = [$target_name, $default_options];
563     push @mappings, $mapping;
564
565     my @suffixes = ();
566     push @suffixes, 'facet' if $facet;
567     push @suffixes, 'suggestion' if $suggestible;
568     push @suffixes, 'sort' if !defined $sort || $sort;
569
570     foreach my $suffix (@suffixes) {
571         my $mapping = ["${target_name}__$suffix"];
572         # TODO: Hack, fix later in less hideous manner
573         if ($suffix eq 'suggestion') {
574             push @{$mapping}, {%{$default_options}, property => 'input'};
575         }
576         else {
577             push @{$mapping}, $default_options;
578         }
579         push @mappings, $mapping;
580     }
581     return @mappings;
582 };
583
584 =head2 _get_marc_mapping_rules
585
586     my $mapping_rules = $self->_get_marc_mapping_rules()
587
588 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
589
590 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
591 each call to C<MARC::Record>->field) we create an optimized structure of mapping
592 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
593
594 We can then iterate through all MARC fields for each record and apply all relevant
595 rules once per fields instead of retreiving fields multiple times for each mapping rule
596 wich is terribly slow.
597
598 =cut
599
600 # TODO: This structure can be used for processing multiple MARC::Records so is currently
601 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
602 # memory cache which it is currently not. The performance gain of caching
603 # would probably be marginal, but to do this could be a further improvement.
604
605 sub _get_marc_mapping_rules {
606     my ($self) = @_;
607
608     my $marcflavour = lc C4::Context->preference('marcflavour');
609     my @rules;
610
611     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
612     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
613     my $rules = {
614         'leader' => [],
615         'control_fields' => {},
616         'data_fields' => {},
617         'sum' => [],
618         'defaults' => {}
619     };
620
621     $self->_foreach_mapping(sub {
622         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
623         return if $marc_type ne $marcflavour;
624
625         if ($type eq 'sum') {
626             push @{$rules->{sum}}, $name;
627         }
628         elsif($type eq 'boolean') {
629             # boolean gets special handling, if value doesn't exist for a field,
630             # it is set to false
631             $rules->{defaults}->{$name} = 'false';
632         }
633
634         if ($marc_field =~ $field_spec_regexp) {
635             my $field_tag = $1;
636
637             my @subfields;
638             my @subfield_groups;
639             # Parse and separate subfields form subfield groups
640             if (defined $2) {
641                 my $subfield_group = '';
642                 my $open_group = 0;
643
644                 foreach my $token (split //, $2) {
645                     if ($token eq "(") {
646                         if ($open_group) {
647                             die("Unmatched opening parenthesis for $marc_field");
648                         }
649                         else {
650                             $open_group = 1;
651                         }
652                     }
653                     elsif ($token eq ")") {
654                         if ($open_group) {
655                             if ($subfield_group) {
656                                 push @subfield_groups, $subfield_group;
657                                 $subfield_group = '';
658                             }
659                             $open_group = 0;
660                         }
661                         else {
662                             die("Unmatched closing parenthesis for $marc_field");
663                         }
664                     }
665                     elsif ($open_group) {
666                         $subfield_group .= $token;
667                     }
668                     else {
669                         push @subfields, $token;
670                     }
671                 }
672             }
673             else {
674                 push @subfields, '*';
675             }
676
677             my $range = defined $3 ? $3 : undef;
678             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
679
680             if ($field_tag < 10) {
681                 $rules->{control_fields}->{$field_tag} //= [];
682                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
683             }
684             else {
685                 $rules->{data_fields}->{$field_tag} //= {};
686                 foreach my $subfield (@subfields) {
687                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
688                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
689                 }
690                 foreach my $subfield_group (@subfield_groups) {
691                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
692                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
693                 }
694             }
695         }
696         elsif ($marc_field =~ $leader_regexp) {
697             my $range = defined $1 ? $1 : undef;
698             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
699             push @{$rules->{leader}}, @mappings;
700         }
701         else {
702             die("Invalid MARC field: $marc_field");
703         }
704     });
705     return $rules;
706 }
707
708 =head2 _foreach_mapping
709
710     $self->_foreach_mapping(
711         sub {
712             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
713                 $marc_field )
714               = @_;
715             return unless $marc_type eq 'marc21';
716             print "Data comes from: " . $marc_field . "\n";
717         }
718     );
719
720 This allows you to apply a function to each entry in the elasticsearch mappings
721 table, in order to build the mappings for whatever is needed.
722
723 In the provided function, the files are:
724
725 =over 4
726
727 =item C<$name>
728
729 The field name for elasticsearch (corresponds to the 'mapping' column in the
730 database.
731
732 =item C<$type>
733
734 The type for this value, e.g. 'string'.
735
736 =item C<$facet>
737
738 True if this value should be facetised. This only really makes sense if the
739 field is understood by the facet processing code anyway.
740
741 =item C<$sort>
742
743 True if this is a field that a) needs special sort handling, and b) if it
744 should be sorted on. False if a) but not b). Undef if not a). This allows,
745 for example, author to be sorted on but not everything marked with "author"
746 to be included in that sort.
747
748 =item C<$marc_type>
749
750 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
751 'unimarc', 'normarc'.
752
753 =item C<$marc_field>
754
755 A string that describes the MARC field that contains the data to extract.
756 These are of a form suited to Catmandu's MARC fixers.
757
758 =back
759
760 =cut
761
762 sub _foreach_mapping {
763     my ( $self, $sub ) = @_;
764
765     # TODO use a caching framework here
766     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
767         {
768             'search_marc_map.index_name' => $self->index,
769         },
770         {   join => { search_marc_to_fields => 'search_marc_map' },
771             '+select' => [
772                 'search_marc_to_fields.facet',
773                 'search_marc_to_fields.suggestible',
774                 'search_marc_to_fields.sort',
775                 'search_marc_map.marc_type',
776                 'search_marc_map.marc_field',
777             ],
778             '+as'     => [
779                 'facet',
780                 'suggestible',
781                 'sort',
782                 'marc_type',
783                 'marc_field',
784             ],
785         }
786     );
787
788     while ( my $search_field = $search_fields->next ) {
789         $sub->(
790             $search_field->name,
791             $search_field->type,
792             $search_field->get_column('facet'),
793             $search_field->get_column('suggestible'),
794             $search_field->get_column('sort'),
795             $search_field->get_column('marc_type'),
796             $search_field->get_column('marc_field'),
797         );
798     }
799 }
800
801 =head2 process_error
802
803     die process_error($@);
804
805 This parses an Elasticsearch error message and produces a human-readable
806 result from it. This result is probably missing all the useful information
807 that you might want in diagnosing an issue, so the warning is also logged.
808
809 Note that currently the resulting message is not internationalised. This
810 will happen eventually by some method or other.
811
812 =cut
813
814 sub process_error {
815     my ($self, $msg) = @_;
816
817     warn $msg; # simple logging
818
819     # This is super-primitive
820     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
821
822     return "Unable to perform your search. Please try again.\n";
823 }
824
825 =head2 _read_configuration
826
827     my $conf = _read_configuration();
828
829 Reads the I<configuration file> and returns a hash structure with the
830 configuration information. It raises an exception if mandatory entries
831 are missing.
832
833 The hashref structure has the following form:
834
835     {
836         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
837         'index_name' => 'koha_instance',
838     }
839
840 This is configured by the following in the C<config> block in koha-conf.xml:
841
842     <elasticsearch>
843         <server>127.0.0.1:9200</server>
844         <server>anotherserver:9200</server>
845         <index_name>koha_instance</index_name>
846     </elasticsearch>
847
848 =cut
849
850 sub _read_configuration {
851
852     my $configuration;
853
854     my $conf = C4::Context->config('elasticsearch');
855     Koha::Exceptions::Config::MissingEntry->throw(
856         "Missing 'elasticsearch' block in config file")
857       unless defined $conf;
858
859     if ( $conf && $conf->{server} ) {
860         my $nodes = $conf->{server};
861         if ( ref($nodes) eq 'ARRAY' ) {
862             $configuration->{nodes} = $nodes;
863         }
864         else {
865             $configuration->{nodes} = [$nodes];
866         }
867     }
868     else {
869         Koha::Exceptions::Config::MissingEntry->throw(
870             "Missing 'server' entry in config file for elasticsearch");
871     }
872
873     if ( defined $conf->{index_name} ) {
874         $configuration->{index_name} = $conf->{index_name};
875     }
876     else {
877         Koha::Exceptions::Config::MissingEntry->throw(
878             "Missing 'index_name' entry in config file for elasticsearch");
879     }
880
881     return $configuration;
882 }
883
884 1;
885
886 __END__
887
888 =head1 AUTHOR
889
890 =over 4
891
892 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
893
894 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
895
896 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
897
898 =back
899
900 =cut