From: Dobrica Pavlinusic Date: Fri, 29 Jan 2010 17:46:14 +0000 (+0100) Subject: join another table and iterate over it X-Git-Url: http://git.rot13.org/?p=mongodb-experiments.git;a=commitdiff_plain;h=1dfe6030765620a1be32bb5e8a81adcf61b692fb;ds=sidebyside join another table and iterate over it This is needed to keep our memory usage down, since DBD::mysql in combination with DBD::Gofer will fetch whole result set in RAM --- diff --git a/dbi2mongo.pl b/dbi2mongo.pl index 653d74d..faf4da7 100755 --- a/dbi2mongo.pl +++ b/dbi2mongo.pl @@ -15,11 +15,15 @@ my $debug = @ARGV ? 1 : 0; our ( $dbi, $user, $password ) = ( "DBI:mysql:database=test" ); our ( $database, $collection ) = ( 'test', 'test' ); -our ( $table, $pk ) = ( 'items', 'id' ); +our ( $table, $pk ) = ( 'biblio' => 'biblionumber' ); +our ( $table2, $fk ) = ( 'biblioitems' => 'biblionumber' ); + +my $limit = 50000; +my $join_limit = 10000; require 'config.pl'; -warn "# $dbi $user -> $database $collection\n"; +warn "# $dbi $user -> $database $collection $table.$pk<->$table2.$fk\n"; my $conn = MongoDB::Connection->new; my $db = $conn->get_database( $database ); @@ -36,7 +40,7 @@ my $last = $coll->query()->sort({ '_id' => -1 })->limit(1)->next; warn dump( $last ); my $last_id = $last->{_id} || 0; -print "Fetching items from $dbi _id > $last_id\n"; +print "import $table.$pk > $last_id from $dbi\n"; my $sth = $dbh->prepare(qq{ select @@ -45,18 +49,70 @@ my $sth = $dbh->prepare(qq{ from $table where $pk > ? order by $pk asc - limit 100000 + limit $limit }); $sth->execute( $last_id ); +warn "# $table columns ",dump( $sth->{NAME} ); +print "import ",$sth->rows," from $table\n"; + +our $join_offset = 0; +our $sth_join; +our $join_more = 0; + +sub join_table { + $sth_join = $dbh->prepare(qq{ + select + $table2.* + from $table2 + where $fk > ? + order by $fk asc + limit $join_limit + offset $join_offset + }); + print STDERR "$join_offset"; + $sth_join->execute( $last_id ); + warn "# $table2 columns ",dump( $sth_join->{NAME} ); + print "join ",$sth_join->rows," from $table2 offset $join_offset limit $join_limit\n"; + $join_more = $sth_join->rows == $join_limit ? 1 : 0; +} + +our $row_join; -warn dump( $sth->{NAME} ); +sub fetch_row_join { + $row_join = $sth_join->fetchrow_hashref(); + if ( ! $row_join && $join_more ) { + $join_offset += $join_limit; + join_table; + $row_join = $sth_join->fetchrow_hashref(); + } + return $row_join; +} -print "found ",$sth->rows," items to process...\n"; +sub guess_types { + my $row = shift; + map { $row->{$_} * 1 } grep { defined $row->{$_} && $row->{$_} =~ /^\d+$/ } keys %$row; + return $row; +} + +join_table; +fetch_row_join; while (my $row = $sth->fetchrow_hashref() ) { - map { $row->{$_} * 1 } grep { defined $row->{$_} && $row->{$_} =~ /^\d+$/ } keys %$row; - $coll->insert( $row ); + while ( $row_join && $row_join->{$fk} < $row->{$pk} ) { + fetch_row_join; + } + + while ( $row_join && $row_join->{$fk} == $row->{$pk} ) { + push @{ $row->{ $table2 } }, guess_types($row_join); + print STDERR "j"; + fetch_row_join; + } + + $coll->insert( guess_types($row) ); + print STDERR "."; + } +print STDERR "\n";