X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FXapcmd.pm;h=7414c9b6802db909fa03046b52a92a263f017e0d;hb=95bdac7f09c69036efed537a4d03d5bdd2ae4eb6;hp=de2ef5c6ad814dae358611f97ef6fe9ba955c9fb;hpb=d4e33279556d7eed492f4e3ba0deade9a31b4aeb;p=public-inbox.git diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index de2ef5c6..7414c9b6 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2019 all contributors +# Copyright (C) 2018-2020 all contributors # License: AGPL-3.0+ package PublicInbox::Xapcmd; use strict; @@ -97,7 +97,7 @@ sub runnable_or_die ($) { sub prepare_reindex ($$$) { my ($ibx, $im, $reindex) = @_; - if ($ibx->{version} == 1) { + if ($ibx->version == 1) { my $dir = $ibx->search->xdir(1); my $xdb = $PublicInbox::Search::X{Database}->new($dir); if (my $lc = $xdb->get_metadata('last_commit')) { @@ -122,7 +122,8 @@ sub same_fs_or_die ($$) { } sub process_queue { - my ($queue, $cb, $max, $opt) = @_; + my ($queue, $cb, $opt) = @_; + my $max = $opt->{jobs} || scalar(@$queue); if ($max <= 1) { while (defined(my $args = shift @$queue)) { $cb->($args, $opt); @@ -152,40 +153,21 @@ sub setup_signals () { $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = sub { exit(1) }; } -sub run { - my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' - my $cb = \&${\"PublicInbox::Xapcmd::$task"}; - PublicInbox::Admin::progress_prepare($opt ||= {}); - my $dir = $ibx->{inboxdir} or die "no inboxdir in inbox\n"; - runnable_or_die($XAPIAN_COMPACT) if $opt->{compact}; - my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } } - my $from; # per-epoch ranges - - if (!$opt->{-coarse_lock}) { - $reindex = $opt->{reindex} = {}; - $from = $reindex->{from} = []; - require PublicInbox::SearchIdx; - PublicInbox::SearchIdx::load_xapian_writable(); - } +sub prepare_run { + my ($ibx, $opt) = @_; + my $tmp = {}; # old shard dir => File::Temp->newdir object or undef + my @queue; # ([old//src,newdir]) - list of args for cpdb() or compact() - $ibx->umask_prepare; my $old = $ibx->search->xdir(1); -d $old or die "$old does not exist\n"; - - my $tmp = {}; - my $v = $ibx->{version} ||= 1; - my @q; my $reshard = $opt->{reshard}; if (defined $reshard && $reshard <= 0) { die "--reshard must be a positive number\n"; } - local %SIG = %SIG; - setup_signals(); - # we want temporary directories to be as deep as possible, # so v2 shards can keep "xap$SCHEMA_VERSION" on a separate FS. - if ($v == 1) { + if ($ibx->version == 1) { if (defined $reshard) { warn "--reshard=$reshard ignored for v1 $ibx->{inboxdir}\n"; @@ -195,7 +177,7 @@ sub run { my $v = PublicInbox::Search::SCHEMA_VERSION(); my $wip = File::Temp->newdir("xapian$v-XXXXXXXX", DIR => $dir); $tmp->{$old} = $wip; - push @q, [ $old, $wip ]; + push @queue, [ $old, $wip ]; } else { opendir my $dh, $old or die "Failed to opendir $old: $!\n"; my @old_shards; @@ -224,7 +206,7 @@ sub run { my $wip = File::Temp->newdir($tmpl, DIR => $old); same_fs_or_die($old, $wip->dirname); my $cur = "$old/$dn"; - push @q, [ $src // $cur , $wip ]; + push @queue, [ $src // $cur , $wip ]; $tmp->{$cur} = $wip; } # mark old shards to be unlinked @@ -232,10 +214,32 @@ sub run { $tmp->{$_} ||= undef for @$src; } } - my $max = $opt->{jobs} || scalar(@q); + ($tmp, \@queue); +} + +sub run { + my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact' + my $cb = \&${\"PublicInbox::Xapcmd::$task"}; + PublicInbox::Admin::progress_prepare($opt ||= {}); + defined(my $dir = $ibx->{inboxdir}) or die "no inboxdir defined\n"; + -d $dir or die "inboxdir=$dir does not exist\n"; + runnable_or_die($XAPIAN_COMPACT) if $opt->{compact}; + my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } } + + if (!$opt->{-coarse_lock}) { + $reindex = $opt->{reindex} = {}; + $reindex->{from} = []; # per-epoch ranges + require PublicInbox::SearchIdx; + PublicInbox::SearchIdx::load_xapian_writable(); + } + + local %SIG = %SIG; + setup_signals(); + $ibx->umask_prepare; $ibx->with_umask(sub { my $im = $ibx->importer(0); $im->lock_acquire; + my ($tmp, $queue) = prepare_run($ibx, $opt); # fine-grained locking if we prepare for reindex if (!$opt->{-coarse_lock}) { @@ -244,7 +248,7 @@ sub run { } $ibx->cleanup; - process_queue(\@q, $cb, $max, $opt); + process_queue($queue, $cb, $opt); $im->lock_acquire if !$opt->{-coarse_lock}; commit_changes($ibx, $im, $tmp, $opt); });