#!/usr/bin/perl use strict; use lib "/usr/local/BackupPC/lib"; use DBI; use BackupPC::Lib; use BackupPC::View; use Data::Dumper; use Getopt::Long::Descriptive; use Time::HiRes qw/time/; use File::Pid; use POSIX qw/strftime/; use BackupPC::Search; use Cwd qw/abs_path/; use Data::Dump qw(dump); use constant BPC_FTYPE_DIR => 5; use constant EST_CHUNK => 4096; # daylight saving time change offset for 1h my $dst_offset = 60 * 60; my $debug = 0; $|=1; my $start_t = time(); my $pid_path = abs_path($0); $pid_path =~ s/\W+/_/g; my $pidfile = new File::Pid({ file => "/tmp/search_update.pid", }); if (my $pid = $pidfile->running ) { die "$0 already running: $pid\n"; } elsif ($pidfile->pid ne $$) { $pidfile->remove; $pidfile = new File::Pid; } print STDERR "$0 using pid ",$pidfile->pid," file ",$pidfile->file,"\n"; $pidfile->write; my $t_fmt = '%Y-%m-%d %H:%M:%S'; my $hosts; my $bpc = BackupPC::Lib->new || die; my %Conf = $bpc->Conf(); my $TopDir = $bpc->TopDir(); my $beenThere = {}; my $dsn = $Conf{SearchDSN} || die "Need SearchDSN in config.pl\n"; my $user = $Conf{SearchUser} || ''; my $index_node_url = $Conf{HyperEstraierIndex}; my $dbh = DBI->connect($dsn, $user, "", { RaiseError => 1, AutoCommit => 0 }); my ($opt,$usage) = describe_options( "%c %o", [ 'create|c', "create database on first use" ], [ 'delete|d', "delete database before import" ], [ 'max|m=i', "import just max increments for one host" ], [ 'host|h=s@', "import just host(s)" ], [], [ 'verbose|v:i', 'set verbosity (debug) level' ], [ 'index|i', 'update full text index' ], [ 'junk|j', "update full text, don't check existing files" ], [ 'fast|f', "don't do anything with full text index" ], [ 'quiet|q', "be quiet for hosts without changes" ], [ 'help', "show help" ], ); print($usage->text), exit if $opt->help; warn "hosts: ",dump( $opt->host ); #---- subs ---- sub status { my $text = shift; $text =~ s{\s+$}{}; my $new = $0; $new =~ s{^[\w\/]+/(\w+) }{$1 }; # strip path from process name if ( $text =~ m/^\|/ ) { $new =~ s/\|.*/$text/ or $new .= " $text"; } else { $new =~ s/\s+.*/ $text/ or $new .= " $text"; } $0 = $new; } sub fmt_time { my $t = shift || return; my $out = ""; my ($ss,$mm,$hh) = gmtime($t); $out .= "${hh}h" if ($hh); $out .= sprintf("%02d:%02d", $mm,$ss); return $out; } sub curr_time { return strftime($t_fmt,localtime()); } sub hest_update { my ($host_id, $share_id, $num) = @_; my $skip_check = $opt->junk && print STDERR "Skipping check for existing files -- this should be used only with initital import\n"; print curr_time," updating fulltext:"; my $t = time(); my $offset = 0; my $added = 0; my $search = BackupPC::Search->search_module; my $results = 0; do { my $where = ''; my @data; if (defined($host_id) && defined($share_id) && defined($num)) { $where = qq{ WHERE hosts.id = ? AND shares.id = ? AND files.backupnum = ? }; @data = ( $host_id, $share_id, $num ); } my $limit = sprintf('LIMIT '.EST_CHUNK.' OFFSET %d', $offset); my $sth = $dbh->prepare(qq{ SELECT files.id AS fid, hosts.name AS hname, shares.name AS sname, -- shares.share AS sharename, files.backupnum AS backupnum, -- files.name AS filename, files.path AS filepath, files.date AS date, files.type AS type, files.size AS size, files.shareid AS shareid, backups.date AS backup_date FROM files INNER JOIN shares ON files.shareID=shares.ID INNER JOIN hosts ON hosts.ID = shares.hostID INNER JOIN backups ON backups.num = files.backupNum and backups.hostID = hosts.ID AND backups.shareID = shares.ID $where $limit }); $sth->execute(@data); $results = $sth->rows; if ($results == 0) { print " - no new files\n"; return; } else { print "..."; } sub fmt_date { my $t = shift || return; my $iso = BackupPC::Lib::timeStamp($t); $iso =~ s/\s/T/; return $iso; } while (my $row = $sth->fetchrow_hashref()) { next if $search->exists( $row ); $search->add_doc( $row ); $added++; } print "$added"; status "| $added"; $offset += EST_CHUNK; } while ($results == EST_CHUNK); $search->commit; my $dur = (time() - $t) || 1; printf(" [%.2f/s dur: %s]\n", ( $added / $dur ), fmt_time($dur) ); } #---- /subs ---- ## update index ## if ( ( $opt->index || $opt->junk ) && !$opt->create ) { # update all print "force update of Hyper Estraier index "; print "by -i flag" if ($opt->index); print "by -j flag" if ($opt->junk); print "\n"; hest_update(); } ## create tables ## if ($opt->create) { sub do_index { my $index = shift || return; my ($table,$col,$unique) = split(/:/, $index); $unique ||= ''; $index =~ s/\W+/_/g; print "$index on $table($col)" . ( $unique ? "u" : "" ) . " "; $dbh->do(qq{ create $unique index $index on $table($col) }); } print "creating tables...\n"; my $sql; { local $/ = undef; $sql = }; $dbh->do( $sql ); print "creating indexes: "; foreach my $index (qw( hosts:name backups:hostID backups:num backups:shareID shares:hostID shares:name files:shareID files:path files:name files:date files:size archive:dvd_nr archive_burned:archive_id backup_parts:backup_id,part_nr:unique )) { do_index($index); } print "...\n"; $dbh->commit; } ## delete data before inseting ## if ($opt->delete) { print "deleting "; foreach my $table (qw(files dvds backups shares hosts)) { print "$table "; $dbh->do(qq{ DELETE FROM $table }); } print " done...\n"; $dbh->commit; } ## insert new values ## # get hosts $hosts = $bpc->HostInfoRead(); my $hostID; my $shareID; my $sth; $sth->{insert_hosts} = $dbh->prepare(qq{ INSERT INTO hosts (name, IP) VALUES (?,?) }); $sth->{hosts_by_name} = $dbh->prepare(qq{ SELECT id FROM hosts WHERE name=? }); $sth->{backups_count} = $dbh->prepare(qq{ SELECT COUNT(*) FROM backups WHERE hostID=? AND num=? AND shareid=? }); $sth->{insert_backups} = $dbh->prepare(qq{ INSERT INTO backups (hostID, num, date, type, shareid, size) VALUES (?,?,?,?,?,-1) }); $sth->{update_backups_size} = $dbh->prepare(qq{ UPDATE backups SET size = ? WHERE hostID = ? and num = ? and date = ? and type =? and shareid = ? }); $sth->{insert_files} = $dbh->prepare(qq{ INSERT INTO files (shareID, backupNum, name, path, date, type, size) VALUES (?,?,?,?,?,?,?) }); my @hosts = keys %{$hosts}; my $host_nr = 0; foreach my $host_key (@hosts) { my $hostname = $hosts->{$host_key}->{'host'} || die "can't find host for $host_key"; next if $opt->host && ! grep { m/^$hostname$/ } @{ $opt->host }; $sth->{hosts_by_name}->execute($hostname); unless (($hostID) = $sth->{hosts_by_name}->fetchrow_array()) { $sth->{insert_hosts}->execute( $hosts->{$host_key}->{'host'}, $hosts->{$host_key}->{'ip'} ); $hostID = $dbh->last_insert_id(undef,undef,'hosts',undef); } $host_nr++; # get backups for a host my @backups = $bpc->BackupInfoRead($hostname); my $incs = scalar @backups; my $host_header = sprintf("host %s [%d/%d]: %d increments\n", $hosts->{$host_key}->{'host'}, $host_nr, ($#hosts + 1), $incs ); print $host_header unless $opt->quiet; my $inc_nr = 0; $beenThere = {}; foreach my $backup (@backups) { $inc_nr++; last if defined $opt->max && $inc_nr > $opt->max; my $backupNum = $backup->{'num'}; my @backupShares = (); my $share_header = sprintf("%-10s %2d/%-2d #%-2d %s %5s/%5s files (date: %s dur: %s)\n", $hosts->{$host_key}->{'host'}, $inc_nr, $incs, $backupNum, $backup->{type} || '?', $backup->{nFilesNew} || '?', $backup->{nFiles} || '?', strftime($t_fmt,localtime($backup->{startTime})), fmt_time($backup->{endTime} - $backup->{startTime}) ); print $share_header unless $opt->quiet; status "$hostname $backupNum $share_header"; my $files = BackupPC::View->new($bpc, $hostname, \@backups, { only_increment => 1 }); foreach my $share ($files->shareList($backupNum)) { my $t = time(); $shareID = getShareID($share, $hostID, $hostname); $sth->{backups_count}->execute($hostID, $backupNum, $shareID); my ($count) = $sth->{backups_count}->fetchrow_array(); # skip if allready in database! next if ($count > 0); # dump host and share header for -q if ( $opt->quiet ) { if ($host_header) { print $host_header; $host_header = undef; } print $share_header; } # dump some log print curr_time," ", $share; $sth->{insert_backups}->execute( $hostID, $backupNum, $backup->{'endTime'}, substr($backup->{'type'},0,4), $shareID, ); my ($f, $nf, $d, $nd, $size) = recurseDir($bpc, $hostname, $files, $backupNum, $share, "", $shareID); eval { $sth->{update_backups_size}->execute( $size, $hostID, $backupNum, $backup->{'endTime'}, substr($backup->{'type'},0,4), $shareID, ); print " commit"; $dbh->commit(); }; if ($@) { print " rollback"; $dbh->rollback(); } my $dur = (time() - $t) || 1; my $status = sprintf("%d/%d files %d/%d dirs %0.2f MB [%.2f/s dur: %s]", $nf, $f, $nd, $d, ($size / 1024 / 1024), ( ($f+$d) / $dur ), fmt_time($dur) ); print " $status\n"; status "$hostname $backupNum $status"; if ($nf + $nd > 0) { status "$hostname $backupNum full-text | indexing"; #eval { hest_update($hostID, $shareID, $backupNum) }; #warn "ERROR: $@" if $@; hest_update($hostID, $shareID, $backupNum); # eval breaks our re-try logic } } } } undef $sth; $dbh->commit(); $dbh->disconnect(); print "total duration: ",fmt_time(time() - $start_t),"\n"; $pidfile->remove; sub getShareID() { my ($share, $hostID, $hostname) = @_; $sth->{share_id} ||= $dbh->prepare(qq{ SELECT ID FROM shares WHERE hostID=? AND name=? }); $sth->{share_id}->execute($hostID,$share); my ($id) = $sth->{share_id}->fetchrow_array(); return $id if (defined($id)); $sth->{insert_share} ||= $dbh->prepare(qq{ INSERT INTO shares (hostID,name,share) VALUES (?,?,?) }); my $drop_down = $hostname . '/' . $share; $drop_down =~ s#//+#/#g; $sth->{insert_share}->execute($hostID,$share, $drop_down); return $dbh->last_insert_id(undef,undef,'shares',undef); } sub found_in_db { my @data = @_; shift @data; my ($key, $shareID,undef,$name,$path,$date,undef,$size) = @_; return $beenThere->{$key} if (defined($beenThere->{$key})); $sth->{file_in_db} ||= $dbh->prepare(qq{ SELECT 1 FROM files WHERE shareID = ? and path = ? and size = ? and ( date = ? or date = ? or date = ? ) LIMIT 1 }); my @param = ($shareID,$path,$size,$date, $date-$dst_offset, $date+$dst_offset); $sth->{file_in_db}->execute(@param); my $rows = $sth->{file_in_db}->rows; print STDERR "## found_in_db($shareID,$path,$date,$size) ",( $rows ? '+' : '-' ), join(" ",@param), "\n" if ($debug >= 3); $beenThere->{$key}++; $sth->{'insert_files'}->execute(@data) unless ($rows); return $rows; } #################################################### # recursing through filesystem structure and # # and returning flattened files list # #################################################### sub recurseDir($$$$$$$$) { my ($bpc, $hostname, $files, $backupNum, $share, $dir, $shareID) = @_; print STDERR "\nrecurse($hostname,$backupNum,$share,$dir,$shareID)\n" if ($debug >= 1); my ($nr_files, $new_files, $nr_dirs, $new_dirs, $size) = (0,0,0,0,0); { # scope my @stack; print STDERR "# dirAttrib($backupNum, $share, $dir)\n" if ($debug >= 2); my $filesInBackup = $files->dirAttrib($backupNum, $share, $dir); # first, add all the entries in current directory foreach my $path_key (keys %{$filesInBackup}) { print STDERR "# file ",Dumper($filesInBackup->{$path_key}),"\n" if ($debug >= 3); my @data = ( $shareID, $backupNum, $path_key, $filesInBackup->{$path_key}->{'relPath'}, $filesInBackup->{$path_key}->{'mtime'}, $filesInBackup->{$path_key}->{'type'}, $filesInBackup->{$path_key}->{'size'} ); my $key = join(" ", ( $shareID, $dir, $path_key, $filesInBackup->{$path_key}->{'mtime'}, $filesInBackup->{$path_key}->{'size'} )); my $key_dst_prev = join(" ", ( $shareID, $dir, $path_key, $filesInBackup->{$path_key}->{'mtime'} - $dst_offset, $filesInBackup->{$path_key}->{'size'} )); my $key_dst_next = join(" ", ( $shareID, $dir, $path_key, $filesInBackup->{$path_key}->{'mtime'} + $dst_offset, $filesInBackup->{$path_key}->{'size'} )); my $found; if ( ! defined($beenThere->{$key}) && ! defined($beenThere->{$key_dst_prev}) && ! defined($beenThere->{$key_dst_next}) && ! ($found = found_in_db($key, @data)) ) { print STDERR "# key: $key [", $beenThere->{$key},"]" if ($debug >= 2); if ($filesInBackup->{$path_key}->{'type'} == BPC_FTYPE_DIR) { $new_dirs++ unless ($found); print STDERR " dir\n" if ($debug >= 2); } else { $new_files++ unless ($found); print STDERR " file\n" if ($debug >= 2); } $size += $filesInBackup->{$path_key}->{'size'} || 0; } if ($filesInBackup->{$path_key}->{'type'} == BPC_FTYPE_DIR) { $nr_dirs++; my $full_path = $dir . '/' . $path_key; push @stack, $full_path; print STDERR "### store to stack: $full_path\n" if ($debug >= 3); # my ($f,$nf,$d,$nd) = recurseDir($bpc, $hostname, $backups, $backupNum, $share, $path_key, $shareID) unless ($beenThere->{$key}); # # $nr_files += $f; # $new_files += $nf; # $nr_dirs += $d; # $new_dirs += $nd; } else { $nr_files++; } } print STDERR "## STACK ",join(", ", @stack),"\n" if ($debug >= 2); while ( my $dir = shift @stack ) { my ($f,$nf,$d,$nd, $s) = recurseDir($bpc, $hostname, $files, $backupNum, $share, $dir, $shareID); print STDERR "# $dir f: $f nf: $nf d: $d nd: $nd\n" if ($debug >= 1); $nr_files += $f; $new_files += $nf; $nr_dirs += $d; $new_dirs += $nd; $size += $s; } } return ($nr_files, $new_files, $nr_dirs, $new_dirs, $size); } __DATA__ create table hosts ( ID SERIAL PRIMARY KEY, name VARCHAR(30) NOT NULL, IP VARCHAR(15) ); create table shares ( ID SERIAL PRIMARY KEY, hostID INTEGER NOT NULL references hosts(id), name VARCHAR(30) NOT NULL, share VARCHAR(200) NOT NULL ); create table dvds ( ID SERIAL PRIMARY KEY, num INTEGER NOT NULL, name VARCHAR(255) NOT NULL, mjesto VARCHAR(255) ); create table backups ( id serial, hostID INTEGER NOT NULL references hosts(id), num INTEGER NOT NULL, date integer NOT NULL, type CHAR(4) not null, shareID integer not null references shares(id), size bigint not null, inc_size bigint not null default -1, inc_deleted boolean default false, parts integer not null default 0, PRIMARY KEY(id) ); create table backup_parts ( id serial, backup_id int references backups(id), part_nr int not null check (part_nr > 0), tar_size bigint not null check (tar_size > 0), size bigint not null check (size > 0), md5 text not null, items int not null check (items > 0), date timestamp default now(), filename text not null, primary key(id) ); create table files ( ID SERIAL, shareID INTEGER NOT NULL references shares(id), backupNum INTEGER NOT NULL, name VARCHAR(255) NOT NULL, path VARCHAR(255) NOT NULL, date integer NOT NULL, type INTEGER NOT NULL, size bigint NOT NULL, primary key(id) ); create sequence dvd_nr; create table archive ( id serial, dvd_nr int not null, total_size bigint default -1, note text, username varchar(20) not null, date timestamp default now(), primary key(id) ); create table archive_parts ( archive_id int not null references archive(id) on delete cascade, backup_part_id int not null references backup_parts(id), primary key(archive_id, backup_part_id) ); create table archive_burned ( archive_id int references archive(id), date timestamp default now(), part int not null default 1, copy int not null default 1, iso_size bigint default -1 ); -- report backups and corresponding dvd --create view backups_on_dvds as --select -- backups.id as id, -- hosts.name || ':' || shares.name as share, -- backups.num as num, -- backups.type as type, -- abstime(backups.date) as backup_date, -- backups.size as size, -- backups.inc_size as gzip_size, -- archive.id as archive_id, -- archive.dvd_nr --from backups --join shares on backups.shareid=shares.id --join hosts on shares.hostid = hosts.id --left outer join archive_backup_parts on backups.id = archive_backup_parts.backup_id --left outer join archive on archive_backup_parts.archive_id = archive.id --where backups.parts > 0 and size > 0 --order by backups.date --; -- used by BackupPC_ASA_BurnArchiveMedia CREATE VIEW archive_backup_parts AS SELECT backup_parts.backup_id, archive_id, dvd_nr, backup_part_id, hosts.name as host, shares.name as share, backups.num as num, backups.date as date, backup_parts.part_nr as part_nr, backups.parts as parts, backup_parts.size as size, backup_parts.md5 as md5, backup_parts.items, backup_parts.filename FROM backup_parts JOIN archive_parts ON backup_parts.id = backup_part_id JOIN archive ON archive_id = archive.id JOIN backups ON backup_id = backups.id JOIN hosts ON hostid = hosts.id JOIN shares ON shareid = shares.id ; CREATE VIEW backups_burned AS SELECT backup_parts.backup_id, count(backup_parts.backup_id) as backup_parts, count(archive_burned.archive_id) AS burned_parts, count(backup_parts.backup_id) = count(archive_burned.archive_id) as burned FROM backup_parts left outer JOIN archive_parts ON backup_part_id = backup_parts.id left join archive on archive.id = archive_id left outer join archive_burned on archive_burned.archive_id = archive.id GROUP BY backup_parts.backup_id ; -- triggers for backup_parts consistency create or replace function backup_parts_check() returns trigger as ' declare b_parts integer; b_counted integer; b_id integer; begin -- raise notice ''old/new parts %/% backup_id %/%'', old.parts, new.parts, old.id, new.id; if (TG_OP=''UPDATE'') then b_id := new.id; b_parts := new.parts; elsif (TG_OP = ''INSERT'') then b_id := new.id; b_parts := new.parts; end if; b_counted := (select count(*) from backup_parts where backup_id = b_id); -- raise notice ''backup % parts %'', b_id, b_parts; if ( b_parts != b_counted ) then raise exception ''Update of backup % aborted, requested % parts and there are really % parts'', b_id, b_parts, b_counted; end if; return null; end; ' language plpgsql; create trigger do_backup_parts_check after insert or update or delete on backups for each row execute procedure backup_parts_check(); create or replace function backup_backup_parts_check() returns trigger as ' declare b_id integer; my_part_nr integer; calc_part integer; begin if (TG_OP = ''INSERT'') then -- raise notice ''trigger: % backup_id %'', TG_OP, new.backup_id; b_id = new.backup_id; my_part_nr = new.part_nr; execute ''update backups set parts = parts + 1 where id = '' || b_id; elsif (TG_OP = ''DELETE'') then -- raise notice ''trigger: % backup_id %'', TG_OP, old.backup_id; b_id = old.backup_id; my_part_nr = old.part_nr; execute ''update backups set parts = parts - 1 where id = '' || b_id; end if; calc_part := (select count(part_nr) from backup_parts where backup_id = b_id); if ( my_part_nr != calc_part ) then raise exception ''Update of backup_parts with backup_id % aborted, requested part_nr is % and calulated next is %'', b_id, my_part_nr, calc_part; end if; return null; end; ' language plpgsql; create trigger do_backup_backup_parts_check after insert or update or delete on backup_parts for each row execute procedure backup_backup_parts_check();