6 use Data::Dump qw/dump/;
9 my ( $riak_url, $table, $dbi ) = @ARGV;
11 $riak_url ||= 'http://10.60.0.92:8098';
12 $table ||= 'biblioitems';
13 $dbi ||= 'DBI:mysql:dbname=koha;host=10.60.0.10;port=3306';
15 my $batch_size = 1000;
16 my $offset = $ENV{OFFSET} || 0;
18 my $dbh = DBI->connect($dbi,"","") || die $DBI::errstr;
19 my $riak = RiakSearch->new( $riak_url );
20 #$riak->{args} = 'w=1';
23 sub riak_search_kv_hook {
25 $riak->request( 'PUT' => $bucket, { props => {
26 precommit => [ { mod => 'riak_search_kv_hook', fun => 'precommit' } ],
28 # last_write_wins => 'true',
31 warn "riak_search_kv_hook $bucket ", $riak->request( 'GET' => '/koha.marcxml' );
34 riak_search_kv_hook 'koha.marcxml';
35 riak_search_kv_hook "koha.$table";
39 my $limit = "LIMIT $batch_size OFFSET $offset";
40 warn "SELECT * FROM $table $limit\n";
42 my $sth = $dbh->prepare(qq{ select * from $table $limit}) || die $dbh->errstr();
43 $sth->execute || die $sth->errstr();
44 my @pk = $dbh->primary_key( undef, undef, $table );
46 print "import ", $sth->rows, " rows from $table pk:",dump( @pk ),"...\n";
48 while (my $row = $sth->fetchrow_hashref() ) {
50 my $key = join('_', map { $row->{$_} } @pk);
52 my $biblionumber = $row->{biblionumber};
54 if ( my $marcxml = delete $row->{marcxml} ) {
55 $riak->request( 'PUT' => "/koha.marcxml/$key", $marcxml, {
56 'Content-Type' => 'text/xml',
57 'Link' => qq|</riak/koha.$table/$biblionumber>; riaktag="biblio"|,
61 # warn "## $key ",dump($row);
64 foreach my $reference ( qw(biblio biblioitems borrowers items) ) {
65 my $singular = $1 if $reference =~ m/^(\w+?)s?$/;
66 my $col = $singular . 'number';
67 next if $key =~ m/$col/;
68 my $number = $row->{$col} || next;
69 push @{ $headers->{Link} }, qq|</riak/koha.$reference/$number>; riaktag="$reference"|;
72 $riak->request( 'PUT' => "/koha.$table/$key", $row, $headers );
77 last if $sth->rows < $batch_size;
81 warn "END total_rows $offset\n";