join another table and iterate over it
authorDobrica Pavlinusic <dpavlin@rot13.org>
Fri, 29 Jan 2010 17:46:14 +0000 (18:46 +0100)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Fri, 29 Jan 2010 17:46:14 +0000 (18:46 +0100)
This is needed to keep our memory usage down, since DBD::mysql in
combination with DBD::Gofer will fetch whole result set in RAM

dbi2mongo.pl

index 653d74d..faf4da7 100755 (executable)
@@ -15,11 +15,15 @@ my $debug = @ARGV ? 1 : 0;
 
 our ( $dbi, $user, $password ) = ( "DBI:mysql:database=test" );
 our ( $database, $collection ) = ( 'test', 'test' );
-our ( $table, $pk ) = ( 'items', 'id' );
+our ( $table,  $pk ) = ( 'biblio'      => 'biblionumber' );
+our ( $table2, $fk ) = ( 'biblioitems' => 'biblionumber' );
+
+my $limit      = 50000;
+my $join_limit = 10000;
 
 require 'config.pl';
 
-warn "# $dbi $user -> $database $collection\n";
+warn "# $dbi $user -> $database $collection $table.$pk<->$table2.$fk\n";
 
 my $conn = MongoDB::Connection->new;
 my $db   = $conn->get_database( $database );
@@ -36,7 +40,7 @@ my $last = $coll->query()->sort({ '_id' => -1 })->limit(1)->next;
 warn dump( $last );
 my $last_id = $last->{_id} || 0;
 
-print "Fetching items from $dbi _id > $last_id\n";
+print "import $table.$pk > $last_id from $dbi\n";
 
 my $sth = $dbh->prepare(qq{
        select
@@ -45,18 +49,70 @@ my $sth = $dbh->prepare(qq{
        from $table
        where $pk > ?
        order by $pk asc
-       limit 100000
+       limit $limit
 });
 
 $sth->execute( $last_id );
+warn "# $table columns ",dump( $sth->{NAME} );
+print "import ",$sth->rows," from $table\n";
+
+our $join_offset = 0;
+our $sth_join;
+our $join_more = 0;
+
+sub join_table {
+       $sth_join = $dbh->prepare(qq{
+               select
+                       $table2.*
+               from $table2
+               where $fk > ?
+               order by $fk asc
+               limit $join_limit
+               offset $join_offset
+       });
+       print STDERR "$join_offset";
+       $sth_join->execute( $last_id );
+       warn "# $table2 columns ",dump( $sth_join->{NAME} );
+       print "join ",$sth_join->rows," from $table2 offset $join_offset limit $join_limit\n";
+       $join_more = $sth_join->rows == $join_limit ? 1 : 0;
+}
+
+our $row_join;
 
-warn dump( $sth->{NAME} );
+sub fetch_row_join {
+       $row_join = $sth_join->fetchrow_hashref();
+       if ( ! $row_join && $join_more ) {
+               $join_offset += $join_limit;
+               join_table;
+               $row_join = $sth_join->fetchrow_hashref();
+       }
+       return $row_join;
+}
 
-print "found ",$sth->rows," items to process...\n";
+sub guess_types {
+       my $row = shift;
+       map { $row->{$_} * 1 } grep { defined $row->{$_} && $row->{$_} =~ /^\d+$/ } keys %$row;
+       return $row;
+}
+
+join_table;
+fetch_row_join;
 
 while (my $row = $sth->fetchrow_hashref() ) {
 
-       map { $row->{$_} * 1 } grep { defined $row->{$_} && $row->{$_} =~ /^\d+$/ } keys %$row;
-       $coll->insert( $row );
+       while ( $row_join && $row_join->{$fk} < $row->{$pk} ) {
+               fetch_row_join;
+       }
+
+       while ( $row_join && $row_join->{$fk} == $row->{$pk} ) {
+               push @{ $row->{ $table2 } }, guess_types($row_join);
+               print STDERR "j";
+               fetch_row_join;
+       }
+
+       $coll->insert( guess_types($row) );
+       print STDERR ".";
+
 }
 
+print STDERR "\n";