use strict;
use DBI;
-use Net::Riak;
+use RiakSearch;
use Data::Dump qw/dump/;
-my $limit = "limit 3";
-my $riak_url = 'http://10.60.0.92:8098';
-my $dbi = 'DBI:mysql:dbname=koha;host=10.60.0.10;port=3306';
-my @tables = qw(
-biblioitems
-biblio
-);
+
+my ( $riak_url, $table, $dbi ) = @ARGV;
+
+$riak_url ||= 'http://10.60.0.92:8098';
+$table ||= 'biblioitems';
+$dbi ||= 'DBI:mysql:dbname=koha;host=10.60.0.10;port=3306';
+
+my $batch_size = 1000;
my $dbh = DBI->connect($dbi,"","") || die $DBI::errstr;
-my $riak = Net::Riak->new(host => $riak_url );
+my $riak = RiakSearch->new( $riak_url );
+#$riak->{args} = 'w=2';
+
+
+sub riak_search_kv_hook {
+ my $bucket = shift;
+ $riak->request( 'PUT' => $bucket, { props => {
+ precommit => [ { mod => 'riak_search_kv_hook', fun => 'precommit' } ],
+# precommit => [],
+# last_write_wins => 'true',
+ }
+ });
+ warn "riak_search_kv_hook $bucket ", $riak->request( 'GET' => '/koha.marcxml' );
+}
-my $xml_bucket = $riak->bucket( 'koha.marcxml' );
-$xml_bucket->set_properties({
- precommit => [ { mod => 'riak_search_kv_hook', fun => 'precommit' } ],
-});
+riak_search_kv_hook 'koha.marcxml';
+riak_search_kv_hook "koha.$table";
+my $offset = 0;
+while(1) {
-foreach my $table ( @tables ) {
+ my $limit = "LIMIT $batch_size OFFSET $offset";
+ warn "SELECT * FROM $table $limit\n";
my $sth = $dbh->prepare(qq{ select * from $table $limit}) || die $dbh->errstr();
$sth->execute || die $sth->errstr();
print "import ", $sth->rows, " rows from $table pk:",dump( @pk ),"...\n";
- my $bucket = $riak->bucket( 'koha.' . $table );
- $bucket->set_properties({
- precommit => [ { mod => 'riak_search_kv_hook', fun => 'precommit' } ],
- });
-
while (my $row = $sth->fetchrow_hashref() ) {
my $key = join('_', map { $row->{$_} } @pk);
+ my $biblionumber = $row->{biblionumber};
if ( my $marcxml = delete $row->{marcxml} ) {
- my $request = $riak->client->new_request(
- 'PUT', [ 'riak', "koha.marcxml/$key" ]
- );
- $request->header('Content-Type' => 'text/xml');
- $request->content($marcxml);
- my $response = $riak->client->send_request($request);
-
- warn "$riak_url/riak/koha.marcxml/$key ", length($marcxml), " bytes\n";
-
- unless ($response->is_success) {
- die "Error put marcxml:", dump( $response );
- }
+ $riak->request( 'PUT' => "/koha.marcxml/$key", $marcxml, {
+ 'Content-Type' => 'text/xml',
+ 'Link' => qq|</riak/koha.$table/$biblionumber>; riaktag="biblio"|,
+ } );
}
- warn "# $key ",dump($row);
- $bucket->new_object( $key => $row )->store;
- warn "$riak_url/riak/koha.$table/$key\n";
+# warn "## $key ",dump($row);
+
+ my $headers;
+ foreach my $reference ( qw(biblio biblioitem) ) {
+ my $col = $reference . 'number';
+ next if $key =~ m/$col/;
+ my $number = $row->{$col} || next;
+ push @{ $headers->{Link} }, qq|</riak/koha.$reference/$number>; riaktag="$reference"|;
+ }
+
+ $riak->request( 'PUT' => "/koha.$table/$key", $row, $headers );
}
+ $offset += $sth->rows;
+ last if $sth->rows < $batch_size;
+
}
+
+warn "END total_rows $offset\n";