]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/Xapcmd.pm
update copyrights for 2021
[public-inbox.git] / lib / PublicInbox / Xapcmd.pm
index 714f6859734fe20366ef1edabf38928b926835e1..8de516ef5ace4f2c52ca840f52f7fe6ec3979944 100644 (file)
@@ -1,15 +1,15 @@
-# Copyright (C) 2018-2020 all contributors <meta@public-inbox.org>
+# Copyright (C) 2018-2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 package PublicInbox::Xapcmd;
 use strict;
-use warnings;
 use PublicInbox::Spawn qw(which popen_rd nodatacow_dir);
+use PublicInbox::Admin qw(setup_signals);
 use PublicInbox::Over;
 use PublicInbox::SearchIdx;
 use File::Temp 0.19 (); # ->newdir
 use File::Path qw(remove_tree);
 use File::Basename qw(dirname);
-use POSIX ();
+use POSIX qw(WNOHANG);
 
 # support testing with dev versions of Xapian which installs
 # commands with a version number suffix (e.g. "xapian-compact-1.5")
@@ -19,7 +19,6 @@ our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
 sub commit_changes ($$$$) {
        my ($ibx, $im, $tmp, $opt) = @_;
        my $reshard = $opt->{reshard};
-       my $reindex = $opt->{reindex};
 
        $SIG{INT} or die 'BUG: $SIG{INT} not handled';
        my @old_shard;
@@ -38,7 +37,7 @@ sub commit_changes ($$$$) {
                        defined $new or die "BUG: $over exists when culling v2";
                        $over = PublicInbox::Over->new($over);
                        my $tmp_over = "$new/over.sqlite3";
-                       $over->connect->sqlite_backup_to_file($tmp_over);
+                       $over->dbh->sqlite_backup_to_file($tmp_over);
                        $over = undef;
                        $over_chg = 1;
                }
@@ -82,7 +81,8 @@ sub commit_changes ($$$$) {
                                $im->{shards} = $n;
                        }
                }
-
+               my $env = $opt->{-idx_env};
+               local %ENV = (%ENV, %$env) if $env;
                PublicInbox::Admin::index_inbox($ibx, $im, $opt);
        }
 }
@@ -101,17 +101,16 @@ sub runnable_or_die ($) {
 }
 
 sub prepare_reindex ($$$) {
-       my ($ibx, $im, $reindex) = @_;
+       my ($ibx, $im, $opt) = @_;
        if ($ibx->version == 1) {
                my $dir = $ibx->search->xdir(1);
                my $xdb = $PublicInbox::Search::X{Database}->new($dir);
                if (my $lc = $xdb->get_metadata('last_commit')) {
-                       $reindex->{from} = $lc;
+                       $opt->{reindex}->{from} = $lc;
                }
        } else { # v2
-               my $max;
-               $im->git_dir_latest(\$max) or return;
-               my $from = $reindex->{from};
+               my $max = $ibx->max_git_epoch // return;
+               my $from = $opt->{reindex}->{from};
                my $mm = $ibx->mm;
                my $v = PublicInbox::Search::SCHEMA_VERSION();
                foreach my $i (0..$max) {
@@ -126,6 +125,11 @@ sub same_fs_or_die ($$) {
        die "$x and $y reside on different filesystems\n";
 }
 
+sub kill_pids {
+       my ($sig, $pids) = @_;
+       kill($sig, keys %$pids); # pids may be empty
+}
+
 sub process_queue {
        my ($queue, $cb, $opt) = @_;
        my $max = $opt->{jobs} // scalar(@$queue);
@@ -138,30 +142,29 @@ sub process_queue {
 
        # run in parallel:
        my %pids;
+       local %SIG = %SIG;
+       setup_signals(\&kill_pids, \%pids);
        while (@$queue) {
                while (scalar(keys(%pids)) < $max && scalar(@$queue)) {
                        my $args = shift @$queue;
                        $pids{cb_spawn($cb, $args, $opt)} = $args;
                }
 
+               my $flags = 0;
                while (scalar keys %pids) {
-                       my $pid = waitpid(-1, 0);
+                       my $pid = waitpid(-1, $flags) or last;
+                       last if $pid < 0;
                        my $args = delete $pids{$pid};
                        if ($args) {
                                die join(' ', @$args)." failed: $?\n" if $?;
                        } else {
                                warn "unknown PID($pid) reaped: $?\n";
                        }
+                       $flags = WNOHANG if scalar(@$queue);
                }
        }
 }
 
-sub setup_signals () {
-       # http://www.tldp.org/LDP/abs/html/exitcodes.html
-       $SIG{INT} = sub { exit(130) };
-       $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = sub { exit(1) };
-}
-
 sub prepare_run {
        my ($ibx, $opt) = @_;
        my $tmp = {}; # old shard dir => File::Temp->newdir object or undef
@@ -233,14 +236,14 @@ sub prepare_run {
 sub check_compact () { runnable_or_die($XAPIAN_COMPACT) }
 
 sub _run {
-       my ($ibx, $cb, $opt, $reindex) = @_;
+       my ($ibx, $cb, $opt) = @_;
        my $im = $ibx->importer(0);
        $im->lock_acquire;
        my ($tmp, $queue) = prepare_run($ibx, $opt);
 
        # fine-grained locking if we prepare for reindex
        if (!$opt->{-coarse_lock}) {
-               prepare_reindex($ibx, $im, $reindex);
+               prepare_reindex($ibx, $im, $opt);
                $im->lock_release;
        }
 
@@ -252,24 +255,22 @@ sub _run {
 
 sub run {
        my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact'
-       my $cb = \&${\"PublicInbox::Xapcmd::$task"};
+       my $cb = \&$task;
        PublicInbox::Admin::progress_prepare($opt ||= {});
        defined(my $dir = $ibx->{inboxdir}) or die "no inboxdir defined\n";
        -d $dir or die "inboxdir=$dir does not exist\n";
        check_compact() if $opt->{compact} && $ibx->search;
-       my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } }
 
        if (!$opt->{-coarse_lock}) {
-               $reindex = $opt->{reindex} = { # per-epoch ranges for v2
-                       from => $ibx->version == 1 ? '' : [],
-               };
+               # per-epoch ranges for v2
+               # v1:{ from => $OID }, v2:{ from => [ $OID, $OID, $OID ] } }
+               $opt->{reindex} = { from => $ibx->version == 1 ? '' : [] };
                PublicInbox::SearchIdx::load_xapian_writable();
        }
 
        local %SIG = %SIG;
        setup_signals();
-       $ibx->umask_prepare;
-       $ibx->with_umask(\&_run, $ibx, $cb, $opt, $reindex);
+       $ibx->with_umask(\&_run, $ibx, $cb, $opt);
 }
 
 sub cpdb_retryable ($$) {
@@ -294,6 +295,11 @@ sub progress_pfx ($) {
        ($p[-1] =~ /\A([0-9]+)/) ? "$p[-2]/$1" : $p[-1];
 }
 
+sub kill_compact { # setup_signals callback
+       my ($sig, $pidref) = @_;
+       kill($sig, $$pidref) if defined($$pidref);
+}
+
 # xapian-compact wrapper
 sub compact ($$) {
        my ($args, $opt) = @_;
@@ -319,14 +325,18 @@ sub compact ($$) {
        }
        $pr->("$pfx `".join(' ', @$cmd)."'\n") if $pr;
        push @$cmd, $src, $dst;
-       my $rd = popen_rd($cmd, undef, $rdr);
+       my ($rd, $pid);
+       local %SIG = %SIG;
+       setup_signals(\&kill_compact, \$pid);
+       ($rd, $pid) = popen_rd($cmd, undef, $rdr);
        while (<$rd>) {
                if ($pr) {
                        s/\r/\r$pfx /g;
                        $pr->("$pfx $_");
                }
        }
-       close $rd or die join(' ', @$cmd)." failed: $?n";
+       waitpid($pid, 0);
+       die "@$cmd failed: \$?=$?\n" if $?;
 }
 
 sub cpdb_loop ($$$;$$) {