]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/Xapcmd.pm
index+xcpdb: improve SIG{INT,TERM,HUP,PIPE} behavior
[public-inbox.git] / lib / PublicInbox / Xapcmd.pm
index 8d09ed2761463ce77fd7cc343a18150f014500e4..348621cef0bfd724eb9366d69e9ffd05294def30 100644 (file)
@@ -2,10 +2,10 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 package PublicInbox::Xapcmd;
 use strict;
-use warnings;
-use PublicInbox::Spawn qw(which popen_rd);
+use PublicInbox::Spawn qw(which popen_rd nodatacow_dir);
+use PublicInbox::Admin qw(setup_signals);
 use PublicInbox::Over;
-use PublicInbox::SearchIdx qw(nodatacow_dir);
+use PublicInbox::SearchIdx;
 use File::Temp 0.19 (); # ->newdir
 use File::Path qw(remove_tree);
 use File::Basename qw(dirname);
@@ -53,7 +53,6 @@ sub commit_changes ($$$$) {
                        rename($old, "$new/old") or
                                        die "rename $old => $new/old: $!\n";
                }
-               # Xtmpdir->DESTROY won't remove $new after this:
                rename($new, $old) or die "rename $new => $old: $!\n";
                if (@st) {
                        my $prev = "$old/old";
@@ -127,6 +126,11 @@ sub same_fs_or_die ($$) {
        die "$x and $y reside on different filesystems\n";
 }
 
+sub kill_pids {
+       my ($sig, $pids) = @_;
+       kill($sig, keys %$pids); # pids may be empty
+}
+
 sub process_queue {
        my ($queue, $cb, $opt) = @_;
        my $max = $opt->{jobs} // scalar(@$queue);
@@ -139,6 +143,8 @@ sub process_queue {
 
        # run in parallel:
        my %pids;
+       local %SIG = %SIG;
+       setup_signals(\&kill_pids, \%pids);
        while (@$queue) {
                while (scalar(keys(%pids)) < $max && scalar(@$queue)) {
                        my $args = shift @$queue;
@@ -157,12 +163,6 @@ sub process_queue {
        }
 }
 
-sub setup_signals () {
-       # http://www.tldp.org/LDP/abs/html/exitcodes.html
-       $SIG{INT} = sub { exit(130) };
-       $SIG{HUP} = $SIG{PIPE} = $SIG{TERM} = sub { exit(1) };
-}
-
 sub prepare_run {
        my ($ibx, $opt) = @_;
        my $tmp = {}; # old shard dir => File::Temp->newdir object or undef
@@ -264,7 +264,6 @@ sub run {
                $reindex = $opt->{reindex} = { # per-epoch ranges for v2
                        from => $ibx->version == 1 ? '' : [],
                };
-               require PublicInbox::SearchIdx;
                PublicInbox::SearchIdx::load_xapian_writable();
        }
 
@@ -296,6 +295,11 @@ sub progress_pfx ($) {
        ($p[-1] =~ /\A([0-9]+)/) ? "$p[-2]/$1" : $p[-1];
 }
 
+sub kill_compact { # setup_signals callback
+       my ($sig, $pidref) = @_;
+       kill($sig, $$pidref) if defined($$pidref);
+}
+
 # xapian-compact wrapper
 sub compact ($$) {
        my ($args, $opt) = @_;
@@ -321,14 +325,18 @@ sub compact ($$) {
        }
        $pr->("$pfx `".join(' ', @$cmd)."'\n") if $pr;
        push @$cmd, $src, $dst;
-       my $rd = popen_rd($cmd, undef, $rdr);
+       my ($rd, $pid);
+       local %SIG = %SIG;
+       setup_signals(\&kill_compact, \$pid);
+       ($rd, $pid) = popen_rd($cmd, undef, $rdr);
        while (<$rd>) {
                if ($pr) {
                        s/\r/\r$pfx /g;
                        $pr->("$pfx $_");
                }
        }
-       close $rd or die join(' ', @$cmd)." failed: $?n";
+       waitpid($pid, 0);
+       die "@$cmd failed: \$?=$?\n" if $?;
 }
 
 sub cpdb_loop ($$$;$$) {
@@ -420,7 +428,7 @@ sub cpdb ($$) {
        my $flag = eval($PublicInbox::Search::Xap.'::DB_CREATE()');
        die if $@;
        my $XapianWritableDatabase = $PublicInbox::Search::X{WritableDatabase};
-       $flag |= $PublicInbox::SearchIdx::DB_NO_SYNC if !$opt->{sync};
+       $flag |= $PublicInbox::SearchIdx::DB_NO_SYNC if !$opt->{fsync};
        my $dst = $XapianWritableDatabase->new($tmp, $flag);
        my $pr = $opt->{-progress};
        my $pfx = $opt->{-progress_pfx} = progress_pfx($new);