6 use Data::Dump qw/dump/;
9 my ( $riak_url, $table, $dbi ) = @ARGV;
11 $riak_url ||= 'http://10.60.0.92:8098';
12 $table ||= 'biblioitems';
13 $dbi ||= 'DBI:mysql:dbname=koha;host=10.60.0.10;port=3306';
15 my $batch_size = 1000;
17 my $dbh = DBI->connect($dbi,"","") || die $DBI::errstr;
18 my $riak = RiakSearch->new( $riak_url );
19 #$riak->{args} = 'w=2';
22 sub riak_search_kv_hook {
24 $riak->request( 'PUT' => $bucket, { props => {
25 precommit => [ { mod => 'riak_search_kv_hook', fun => 'precommit' } ],
27 # last_write_wins => 'true',
30 warn "riak_search_kv_hook $bucket ", $riak->request( 'GET' => '/koha.marcxml' );
33 riak_search_kv_hook 'koha.marcxml';
34 riak_search_kv_hook "koha.$table";
39 my $limit = "LIMIT $batch_size OFFSET $offset";
40 warn "SELECT * FROM $table $limit\n";
42 my $sth = $dbh->prepare(qq{ select * from $table $limit}) || die $dbh->errstr();
43 $sth->execute || die $sth->errstr();
44 my @pk = $dbh->primary_key( undef, undef, $table );
46 print "import ", $sth->rows, " rows from $table pk:",dump( @pk ),"...\n";
48 while (my $row = $sth->fetchrow_hashref() ) {
50 my $key = join('_', map { $row->{$_} } @pk);
51 my $biblionumber = $row->{biblionumber};
53 if ( my $marcxml = delete $row->{marcxml} ) {
54 $riak->request( 'PUT' => "/koha.marcxml/$key", $marcxml, {
55 'Content-Type' => 'text/xml',
56 'Link' => qq|</riak/koha.$table/$biblionumber>; riaktag="biblio"|,
60 # warn "## $key ",dump($row);
63 $headers->{Link} = qq|</riak/koha.biblio/$biblionumber>; riaktag="biblio"|
64 if $biblionumber && $key !~ m/biblionumber/;
66 $riak->request( 'PUT' => "/koha.$table/$key", $row, $headers );
69 $offset += $sth->rows;
70 last if $sth->rows < $batch_size;
74 warn "END total_rows $offset\n";