X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FXapcmd.pm;h=7414c9b6802db909fa03046b52a92a263f017e0d;hb=95bdac7f09c69036efed537a4d03d5bdd2ae4eb6;hp=544242a36663594d489aa4a54bfa13e77e30cdd6;hpb=267371b1273b518215939e817e53733584b68af7;p=public-inbox.git diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index 544242a3..7414c9b6 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -1,12 +1,12 @@ -# Copyright (C) 2018-2019 all contributors +# Copyright (C) 2018-2020 all contributors # License: AGPL-3.0+ package PublicInbox::Xapcmd; use strict; use warnings; -use PublicInbox::Spawn qw(which spawn); +use PublicInbox::Spawn qw(which popen_rd); use PublicInbox::Over; use PublicInbox::SearchIdx; -use File::Temp (); +use File::Temp 0.19 (); # ->newdir use File::Path qw(remove_tree); use File::Basename qw(dirname); use POSIX (); @@ -97,7 +97,7 @@ sub runnable_or_die ($) { sub prepare_reindex ($$$) { my ($ibx, $im, $reindex) = @_; - if ($ibx->{version} == 1) { + if ($ibx->version == 1) { my $dir = $ibx->search->xdir(1); my $xdb = $PublicInbox::Search::X{Database}->new($dir); if (my $lc = $xdb->get_metadata('last_commit')) { @@ -122,7 +122,8 @@ sub same_fs_or_die ($$) { } sub process_queue { - my ($queue, $cb, $max, $opt) = @_; + my ($queue, $cb, $opt) = @_; + my $max = $opt->{jobs} || scalar(@$queue); if ($max <= 1) { while (defined(my $args = shift @$queue)) { $cb->($args, $opt); @@ -152,40 +153,21 @@ sub setup_signals () { $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = sub { exit(1) }; } -sub run { - my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' - my $cb = \&${\"PublicInbox::Xapcmd::$task"}; - PublicInbox::Admin::progress_prepare($opt ||= {}); - my $dir = $ibx->{inboxdir} or die "no inboxdir in inbox\n"; - runnable_or_die($XAPIAN_COMPACT) if $opt->{compact}; - my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } } - my $from; # per-epoch ranges - - if (!$opt->{-coarse_lock}) { - $reindex = $opt->{reindex} = {}; - $from = $reindex->{from} = []; - require PublicInbox::SearchIdx; - PublicInbox::SearchIdx::load_xapian_writable(); - } +sub prepare_run { + my ($ibx, $opt) = @_; + my $tmp = {}; # old shard dir => File::Temp->newdir object or undef + my @queue; # ([old//src,newdir]) - list of args for cpdb() or compact() - $ibx->umask_prepare; my $old = $ibx->search->xdir(1); -d $old or die "$old does not exist\n"; - - my $tmp = {}; - my $v = $ibx->{version} ||= 1; - my @q; my $reshard = $opt->{reshard}; if (defined $reshard && $reshard <= 0) { die "--reshard must be a positive number\n"; } - local %SIG = %SIG; - setup_signals(); - # we want temporary directories to be as deep as possible, # so v2 shards can keep "xap$SCHEMA_VERSION" on a separate FS. - if ($v == 1) { + if ($ibx->version == 1) { if (defined $reshard) { warn "--reshard=$reshard ignored for v1 $ibx->{inboxdir}\n"; @@ -195,7 +177,7 @@ sub run { my $v = PublicInbox::Search::SCHEMA_VERSION(); my $wip = File::Temp->newdir("xapian$v-XXXXXXXX", DIR => $dir); $tmp->{$old} = $wip; - push @q, [ $old, $wip ]; + push @queue, [ $old, $wip ]; } else { opendir my $dh, $old or die "Failed to opendir $old: $!\n"; my @old_shards; @@ -224,7 +206,7 @@ sub run { my $wip = File::Temp->newdir($tmpl, DIR => $old); same_fs_or_die($old, $wip->dirname); my $cur = "$old/$dn"; - push @q, [ $src // $cur , $wip ]; + push @queue, [ $src // $cur , $wip ]; $tmp->{$cur} = $wip; } # mark old shards to be unlinked @@ -232,10 +214,32 @@ sub run { $tmp->{$_} ||= undef for @$src; } } - my $max = $opt->{jobs} || scalar(@q); + ($tmp, \@queue); +} + +sub run { + my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' + my $cb = \&${\"PublicInbox::Xapcmd::$task"}; + PublicInbox::Admin::progress_prepare($opt ||= {}); + defined(my $dir = $ibx->{inboxdir}) or die "no inboxdir defined\n"; + -d $dir or die "inboxdir=$dir does not exist\n"; + runnable_or_die($XAPIAN_COMPACT) if $opt->{compact}; + my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } } + + if (!$opt->{-coarse_lock}) { + $reindex = $opt->{reindex} = {}; + $reindex->{from} = []; # per-epoch ranges + require PublicInbox::SearchIdx; + PublicInbox::SearchIdx::load_xapian_writable(); + } + + local %SIG = %SIG; + setup_signals(); + $ibx->umask_prepare; $ibx->with_umask(sub { my $im = $ibx->importer(0); $im->lock_acquire; + my ($tmp, $queue) = prepare_run($ibx, $opt); # fine-grained locking if we prepare for reindex if (!$opt->{-coarse_lock}) { @@ -244,7 +248,7 @@ sub run { } $ibx->cleanup; - process_queue(\@q, $cb, $max, $opt); + process_queue($queue, $cb, $opt); $im->lock_acquire if !$opt->{-coarse_lock}; commit_changes($ibx, $im, $tmp, $opt); }); @@ -277,7 +281,6 @@ sub compact ($$) { my ($args, $opt) = @_; my ($src, $newdir) = @$args; my $dst = ref($newdir) ? $newdir->dirname : $newdir; - my ($r, $w); my $pfx = $opt->{-progress_pfx} ||= progress_pfx($src); my $pr = $opt->{-progress}; my $rdr = {}; @@ -286,7 +289,6 @@ sub compact ($$) { defined(my $dfd = $opt->{$fd}) or next; $rdr->{$fd} = $dfd; } - $rdr->{1} = $w if $pr && pipe($r, $w); # we rely on --no-renumber to keep docids synched to NNTP my $cmd = [ $XAPIAN_COMPACT, '--no-renumber' ]; @@ -299,18 +301,14 @@ sub compact ($$) { } $pr->("$pfx `".join(' ', @$cmd)."'\n") if $pr; push @$cmd, $src, $dst; - my $pid = spawn($cmd, undef, $rdr); - if ($pr) { - close $w or die "close: \$w: $!"; - foreach (<$r>) { + my $rd = popen_rd($cmd, undef, $rdr); + while (<$rd>) { + if ($pr) { s/\r/\r$pfx /g; $pr->("$pfx $_"); } } - my $rp = waitpid($pid, 0); - if ($? || $rp != $pid) { - die join(' ', @$cmd)." failed: $? (pid=$pid, reaped=$rp)\n"; - } + close $rd or die join(' ', @$cmd)." failed: $?n"; } sub cpdb_loop ($$$;$$) {