]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ds: block signals when reaping
authorEric Wong <e@80x24.org>
Sun, 10 Jan 2021 12:15:00 +0000 (12:15 +0000)
committerEric Wong <e@80x24.org>
Tue, 12 Jan 2021 03:51:42 +0000 (03:51 +0000)
This lets us call dwaitpid long before a process exits
and not have to wait around for it.

This is advantageous for lei where we can run dwaitpid on the
pager as soon as we spawn it, instead of waiting for a client
socket to go away on DESTROY.

lib/PublicInbox/DS.pm
lib/PublicInbox/Daemon.pm
lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/Sigfd.pm
lib/PublicInbox/Watch.pm
script/public-inbox-watch
t/spawn.t

index 8a560ae8b4ad53be049d051350efde8af98e0b9b..40994fd44a085f286f13eb8126c69d98d1717747 100644 (file)
@@ -24,7 +24,7 @@ use strict;
 use v5.10.1;
 use parent qw(Exporter);
 use bytes;
-use POSIX qw(WNOHANG);
+use POSIX qw(WNOHANG sigprocmask SIG_SETMASK);
 use IO::Handle qw();
 use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
@@ -202,6 +202,16 @@ sub RunTimers {
     ($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout;
 }
 
+sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
+
+sub block_signals () {
+       my $oldset = POSIX::SigSet->new;
+       my $newset = POSIX::SigSet->new;
+       $newset->fillset or die "fillset: $!";
+       sig_setmask($newset, $oldset);
+       $oldset;
+}
+
 # We can't use waitpid(-1) safely here since it can hit ``, system(),
 # and other things.  So we scan the $wait_pids list, which is hopefully
 # not too big.  We keep $wait_pids small by not calling dwaitpid()
@@ -211,6 +221,7 @@ sub reap_pids {
        $reap_armed = undef;
        my $tmp = $wait_pids or return;
        $wait_pids = undef;
+       my $oldset = block_signals();
        foreach my $ary (@$tmp) {
                my ($pid, $cb, $arg) = @$ary;
                my $ret = waitpid($pid, WNOHANG);
@@ -225,8 +236,7 @@ sub reap_pids {
                        warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
                }
        }
-       # we may not be done, yet, and could've missed/masked a SIGCHLD:
-       $reap_armed //= requeue(\&reap_pids) if $wait_pids;
+       sig_setmask($oldset);
 }
 
 # reentrant SIGCHLD handler (since reap_pids is not reentrant)
index 4dcb5fb6f927f416b0e4b2574b7b965535be3637..4b738b7c56c7bbc06ce8777ce2a7a63e7dbb75f5 100644 (file)
@@ -77,7 +77,7 @@ sub accept_tls_opt ($) {
 sub daemon_prepare ($) {
        my ($default_listen) = @_;
        my $listener_names = {}; # sockname => IO::Handle
-       $oldset = PublicInbox::Sigfd::block_signals();
+       $oldset = PublicInbox::DS::block_signals();
        @CMD = ($0, @ARGV);
        my ($prog) = ($CMD[0] =~ m!([^/]+)\z!g);
        my $help = <<EOF;
@@ -515,7 +515,7 @@ EOF
        };
        my $sigfd = PublicInbox::Sigfd->new($sig, 0);
        local %SIG = (%SIG, %$sig) if !$sigfd;
-       PublicInbox::Sigfd::sig_setmask($oldset) if !$sigfd;
+       PublicInbox::DS::sig_setmask($oldset) if !$sigfd;
        while (1) { # main loop
                my $n = scalar keys %pids;
                unless (@listeners) {
@@ -531,7 +531,7 @@ EOF
                }
                my $want = $worker_processes - 1;
                if ($n <= $want) {
-                       PublicInbox::Sigfd::block_signals() if !$sigfd;
+                       PublicInbox::DS::block_signals() if !$sigfd;
                        for my $i ($n..$want) {
                                my $pid = fork;
                                if (!defined $pid) {
@@ -544,7 +544,7 @@ EOF
                                        $pids{$pid} = $i;
                                }
                        }
-                       PublicInbox::Sigfd::sig_setmask($oldset) if !$sigfd;
+                       PublicInbox::DS::sig_setmask($oldset) if !$sigfd;
                }
 
                if ($sigfd) { # Linux and IO::KQueue users:
@@ -632,7 +632,7 @@ sub daemon_loop ($$$$) {
        if (!$sigfd) {
                # wake up every second to accept signals if we don't
                # have signalfd or IO::KQueue:
-               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS::sig_setmask($oldset);
                PublicInbox::DS->SetLoopTimeout(1000);
        }
        PublicInbox::DS->EventLoop;
index e6c21866b084fd4aa915cd6ac4c13c1194f8c6d1..85959a95c8c81e7534362635b8b0a8a10d0855ae 100644 (file)
@@ -1090,7 +1090,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
        $pr->("performing initial scan ...\n") if $pr;
        my $sync = eidx_sync($self, $opt); # initial sync
        return if $sync->{quit};
-       my $oldset = PublicInbox::Sigfd::block_signals();
+       my $oldset = PublicInbox::DS::block_signals();
        local $self->{current_info} = '';
        my $cb = $SIG{__WARN__} || \&CORE::warn;
        local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
@@ -1108,7 +1108,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
        if (!$sigfd) {
                # wake up every second to accept signals if we don't
                # have signalfd or IO::KQueue:
-               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS::sig_setmask($oldset);
                PublicInbox::DS->SetLoopTimeout(1000);
        }
        PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
index c1f6f9200793b17ff2dd8939845ddf1174e4267c..81623fc0a807ce871736b7b4f4ec5db3a24cf566 100644 (file)
@@ -81,7 +81,7 @@ sub ipc_worker_spawn {
        delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
        pipe(my ($r_req, $w_req)) or die "pipe: $!";
        pipe(my ($r_res, $w_res)) or die "pipe: $!";
-       my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+       my $sigset = $oldset // PublicInbox::DS::block_signals();
        my $parent = $$;
        $self->ipc_atfork_parent;
        defined(my $pid = fork) or die "fork: $!";
@@ -92,13 +92,13 @@ sub ipc_worker_spawn {
                $w_res->autoflush(1);
                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
                local $0 = $ident;
-               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS::sig_setmask($oldset);
                my $on_destroy = $self->ipc_atfork_child;
                eval { ipc_worker_loop($self, $r_req, $w_res) };
                die "worker $ident PID:$$ died: $@\n" if $@;
                exit;
        }
-       PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+       PublicInbox::DS::sig_setmask($sigset) unless $oldset;
        $r_req = $w_res = undef;
        $w_req->autoflush(1);
        $self->{-ipc_req} = $w_req;
index a5658e6d1a50a7a9414c375948a2a1d3ec553cae..12e227d2ace7d183161235d9a77d8027ffb2dee8 100644 (file)
@@ -607,7 +607,8 @@ sub start_pager {
        my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
        $self->{1} = $w;
        $self->{2} = $w if -t $self->{2};
-       $self->{'pager.pid'} = spawn([$pager], $env, $rdr);
+       my $pid = spawn([$pager], $env, $rdr);
+       dwaitpid($pid, undef, $self->{sock});
        $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
 }
 
@@ -689,7 +690,7 @@ sub lazy_start {
        my @st = stat($path) or die "stat($path): $!";
        my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
        pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
-       my $oldset = PublicInbox::Sigfd::block_signals();
+       my $oldset = PublicInbox::DS::block_signals();
        if ($nfd == 1) {
                require IO::FDPass;
                $recv_3fds = sub { map { IO::FDPass::recv($_[0]) } (0..2) };
@@ -736,7 +737,7 @@ sub lazy_start {
        } else {
                # wake up every second to accept signals if we don't
                # have signalfd or IO::KQueue:
-               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS::sig_setmask($oldset);
                PublicInbox::DS->SetLoopTimeout(1000);
        }
        PublicInbox::DS->SetPostLoopCallback(sub {
@@ -801,9 +802,6 @@ sub oneshot {
 sub DESTROY {
        my ($self) = @_;
        $self->{1}->autoflush(1);
-       if (my $pid = delete $self->{'pager.pid'}) {
-               dwaitpid($pid, undef, $self->{sock});
-       }
 }
 
 1;
index db0bf523557c75ed79b0337fff69ffa1efeac96d..a4d1b3bba81a26bc9a74e91a630d824c7ae1aa57 100644 (file)
@@ -7,7 +7,7 @@ package PublicInbox::Sigfd;
 use strict;
 use parent qw(PublicInbox::DS);
 use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK);
-use POSIX qw(:signal_h);
+use POSIX ();
 use IO::Handle ();
 
 # returns a coderef to unblock signals if neither signalfd or kqueue
@@ -63,14 +63,4 @@ sub event_step {
        while (wait_once($_[0])) {} # non-blocking
 }
 
-sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
-
-sub block_signals () {
-       my $oldset = POSIX::SigSet->new;
-       my $newset = POSIX::SigSet->new;
-       $newset->fillset or die "fillset: $!";
-       sig_setmask($newset, $oldset);
-       $oldset;
-}
-
 1;
index c39ce1a7d2f233b55ba42d9256e8c9c32200a5af..9a7291404404cdddd8250f2ba29bd02dac7de31e 100644 (file)
@@ -583,13 +583,13 @@ sub watch_atfork_child ($) {
        delete $self->{opendirs};
        PublicInbox::DS->Reset;
        %SIG = (%SIG, %{$self->{sig}}, CHLD => 'DEFAULT');
-       PublicInbox::Sigfd::sig_setmask($self->{oldset});
+       PublicInbox::DS::sig_setmask($self->{oldset});
 }
 
 sub watch_atfork_parent ($) {
        my ($self) = @_;
        _done_for_now($self);
-       PublicInbox::Sigfd::block_signals();
+       PublicInbox::DS::block_signals();
 }
 
 sub imap_idle_requeue ($) { # DS::add_timer callback
@@ -648,7 +648,7 @@ sub event_step {
                                imap_idle_fork($self, $url_intvl);
                        }
                };
-               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS::sig_setmask($oldset);
                die $@ if $@;
        }
        fs_scan_step($self) if $self->{mdre};
@@ -716,7 +716,7 @@ sub poll_fetch_fork ($) { # DS::add_timer callback
                close $w;
                _exit(0);
        }
-       PublicInbox::Sigfd::sig_setmask($oldset);
+       PublicInbox::DS::sig_setmask($oldset);
        die "fork: $!"  unless defined $pid;
        $self->{poll_pids}->{$pid} = [ $intvl, $urls ];
        PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
index 9ada9c3b22e11756baeb429a63c4619ff1237fe7..10c7cd6f97fdfabeac71117dae93788779bbae0a 100755 (executable)
@@ -19,7 +19,7 @@ my $do_scan = 1;
 GetOptions('scan!' => \$do_scan, # undocumented, testing only
        'help|h' => \(my $show_help)) or do { print STDERR $help; exit 1 };
 if ($show_help) { print $help; exit 0 };
-my $oldset = PublicInbox::Sigfd::block_signals();
+my $oldset = PublicInbox::DS::block_signals();
 STDOUT->autoflush(1);
 STDERR->autoflush(1);
 local $0 = $0; # local since this script may be eval-ed
@@ -60,7 +60,7 @@ if ($watch) {
        my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
        local %SIG = (%SIG, %$sig) if !$sigfd;
        if (!$sigfd) {
-               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS::sig_setmask($oldset);
                PublicInbox::DS->SetLoopTimeout(1000);
        }
        $watch->watch($sig, $oldset) while ($watch);
index 891a370211e28ff17035dc12d07d8b8bbe09021a..558afc289b6277712f754632157525764183cbe4 100644 (file)
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -61,7 +61,7 @@ elsif ($pid > 0) {
        select(undef, undef, undef, 0.01) while 1;
 }
 EOF
-       my $oldset = PublicInbox::Sigfd::block_signals();
+       my $oldset = PublicInbox::DS::block_signals();
        my $rd = popen_rd([$^X, '-e', $script]);
        diag 'waiting for child to reap grandchild...';
        chomp(my $line = readline($rd));
@@ -70,7 +70,7 @@ EOF
        ok(kill('CHLD', $pid), 'sent SIGCHLD to child');
        is(readline($rd), "HI\n", '$SIG{CHLD} works in child');
        ok(close $rd, 'popen_rd close works');
-       PublicInbox::Sigfd::sig_setmask($oldset);
+       PublicInbox::DS::sig_setmask($oldset);
 }
 
 {