iterate in 10000 rows chunks over primary table
[mongodb-experiments.git] / dbi2mongo.pl
1 #!/usr/bin/perl -w
2
3 sub BEGIN {
4 $ENV{DBI_AUTOPROXY}='dbi:Gofer:transport=stream;url=ssh:dpavlin@koha.ffzg.hr';
5 }
6
7 use strict;
8 use DBI;
9 use MongoDB;
10 use Data::Dump qw/dump/;
11
12 $|++;
13
14 my $debug = @ARGV ? 1 : 0;
15
16 our ( $dbi, $user, $password ) = ( "DBI:mysql:database=test" );
17 our ( $database, $collection ) = ( 'test', 'test' );
18 our ( $table,  $pk ) = ( 'biblio'      => 'biblionumber' );
19 our ( $table2, $fk ) = ( 'biblioitems' => 'biblionumber' );
20
21 my $limit      = 10000;
22 my $join_limit = 10000;
23
24 require 'config.pl';
25
26 warn "# $dbi $user -> $database $collection $table.$pk<->$table2.$fk\n";
27
28 my $conn = MongoDB::Connection->new;
29 my $db   = $conn->get_database( $database );
30 my $coll = $db->get_collection( $collection );
31 my $dbh  = DBI->connect($dbi,$user,$password, {
32         RaiseError => 1,
33 #       mysql_enable_utf8 => 1,
34 });
35
36 $db->drop if $debug;
37
38 # db.items.find().sort({_id:-1}).limit(1);
39 my $last = $coll->query()->sort({ '_id' => -1 })->limit(1)->next;
40 warn dump( $last );
41 my $last_id = $last->{_id} || 0;
42
43 print "import $table.$pk > $last_id from $dbi\n";
44
45 our $offset = 0;
46 our $sth;
47
48 sub select_table {
49
50         $sth = $dbh->prepare(qq{
51                 select
52                         $pk as _id,
53                         $table.*
54                 from $table
55                 where $pk > ?
56                 order by $pk asc
57                 limit $limit
58                 offset $offset
59         });
60
61         print STDERR " $table:$offset ";
62         $sth->execute( $last_id );
63 #       warn "# $table columns ",dump( $sth->{NAME} ) if $offset == 0;
64         print STDERR " join ", $sth->rows, " rows ";
65 }
66
67 our $row;
68 sub fetch_row {
69         $row = $sth->fetchrow_hashref();
70         if ( ! $row && $sth->rows == $limit ) {
71                 $offset += $limit;
72                 select_table;
73                 $row = $sth->fetchrow_hashref();
74         }
75         return $row;
76 }
77
78 our $join_offset = 0;
79 our $sth_join;
80
81 sub join_table {
82         $sth_join = $dbh->prepare(qq{
83                 select
84                         $table2.*
85                 from $table2
86                 where $fk > ?
87                 order by $fk asc
88                 limit $join_limit
89                 offset $join_offset
90         });
91         print STDERR " $table2:$join_offset ";
92         $sth_join->execute( $last_id );
93 #       warn "# $table2 columns ",dump( $sth_join->{NAME} ) if $join_offset = 0;
94         print STDERR " join ",$sth_join->rows, " rows ";
95 }
96
97 our $row_join;
98 sub fetch_row_join {
99         $row_join = $sth_join->fetchrow_hashref();
100         if ( ! $row_join && $sth_join->rows == $join_limit ) {
101                 $join_offset += $join_limit;
102                 join_table;
103                 $row_join = $sth_join->fetchrow_hashref();
104         }
105         return $row_join;
106 }
107
108 sub guess_types {
109         my $row = shift;
110         map { $row->{$_} * 1 } grep { defined $row->{$_} && $row->{$_} =~ /^\d+$/ } keys %$row;
111         return $row;
112 }
113
114 select_table;
115 join_table;
116 fetch_row_join;
117
118 while (my $row = fetch_row() ) {
119
120         while ( $row_join && $row_join->{$fk} < $row->{$pk} ) {
121                 fetch_row_join;
122         }
123
124         while ( $row_join && $row_join->{$fk} == $row->{$pk} ) {
125                 push @{ $row->{ $table2 } }, guess_types($row_join);
126                 print STDERR "j";
127                 fetch_row_join;
128         }
129
130         $coll->insert( guess_types($row) );
131         print STDERR ".";
132
133 }
134
135 print STDERR "\n";