]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ipc: children don't kill on DESTROY, reduce FD sharing
authorEric Wong <e@80x24.org>
Sat, 16 Jan 2021 11:36:22 +0000 (23:36 -1200)
committerEric Wong <e@80x24.org>
Mon, 18 Jan 2021 09:25:31 +0000 (09:25 +0000)
Children should not be blindly killing siblings on ->DESTROY
since they're typically shorter-lived than parents.  We'll
also be more careful about on-stack variables and now we
can rely exclusively on delete ops to close FDs.

We also need to fix our SIGPIPE handling for the oneshot case
while fixing a typo for delete, so we write "!" to the EOF pipe
to ensure the parent oneshot process exits on the first worker
that hits SIGPIPE, rather than waiting for the last worker to
hit SIGPIPE.

lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiXSearch.pm

index fbc91f6f96ea013252721c2c3127f70a6213d979..78cb8400d23758416c5859cdc468cef0f11144b0 100644 (file)
@@ -104,11 +104,11 @@ sub ipc_worker_spawn {
        pipe(my ($r_req, $w_req)) or die "pipe: $!";
        pipe(my ($r_res, $w_res)) or die "pipe: $!";
        my $sigset = $oldset // PublicInbox::DS::block_signals();
-       my $parent = $$;
        $self->ipc_atfork_prepare;
        defined(my $pid = fork) or die "fork: $!";
        if ($pid == 0) {
                eval { PublicInbox::DS->Reset };
+               delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                $w_req = $r_res = undef;
                $w_res->autoflush(1);
                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
@@ -283,8 +283,7 @@ sub _wq_worker_start ($$) {
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
                eval { PublicInbox::DS->Reset };
-               close(delete $self->{-wq_s1});
-               delete $self->{qw(-wq_workers -wq_ppid)};
+               delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN));
                $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD));
                local $0 = $self->{-wq_ident};
@@ -306,16 +305,15 @@ sub wq_workers_start {
        my ($self, $ident, $nr_workers, $oldset) = @_;
        ($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
        return if $self->{-wq_s1}; # idempotent
-       my ($s1, $s2);
-       socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
+       $self->{-wq_s1} = $self->{-wq_s2} = undef;
+       socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
+               die "socketpair: $!";
        $self->ipc_atfork_prepare;
        $nr_workers //= 4;
        $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
        my $sigset = $oldset // PublicInbox::DS::block_signals();
        $self->{-wq_workers} = {};
        $self->{-wq_ident} = $ident;
-       $self->{-wq_s1} = $s1;
-       $self->{-wq_s2} = $s2;
        _wq_worker_start($self, $sigset) for (1..$nr_workers);
        PublicInbox::DS::sig_setmask($sigset) unless $oldset;
        $self->{-wq_ppid} = $$;
@@ -377,6 +375,7 @@ sub wq_close {
        my $ppid = delete $self->{-wq_ppid} or return;
        my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
        return if $ppid != $$; # can't reap siblings or parents
+       return (keys %$workers) if wantarray; # caller will reap
        for my $pid (keys %$workers) {
                dwaitpid($pid, \&ipc_worker_reap, $self);
        }
@@ -391,9 +390,11 @@ sub wq_kill {
 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
 
 sub DESTROY {
-       wq_kill($_[0]);
-       wq_close($_[0]);
-       ipc_worker_stop($_[0]);
+       my ($self) = @_;
+       my $ppid = $self->{-wq_ppid};
+       wq_kill($self) if $ppid && $ppid == $$;
+       wq_close($self);
+       ipc_worker_stop($self);
 }
 
 1;
index 2784ca6b85d26936aac16aa24dcc96e43b456c39..5e6eb0aff7a548e78975b46b922e070780ee7766 100644 (file)
@@ -287,6 +287,7 @@ sub atfork_child_wq {
                $self->x_it(13); # SIGPIPE = 13
                # we need to close explicitly to avoid Perl warning on SIGPIPE
                close($_) for (delete @$self{1..2});
+               syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait
                die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
        });
 }
index 25ded54441213a3e9651b1d01f2dac106bdc36fe..8b70167cb71d943ef582061e8fadecefdc8a9047 100644 (file)
@@ -8,6 +8,7 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::DS qw(dwaitpid);
 
 sub new {
        my ($class) = @_;
@@ -181,8 +182,14 @@ sub do_query {
                $lei_orig->{lxs} = $self;
                $lei_orig->event_step_init;
        } else {
-               $self->wq_close;
-               read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+               my @pids = $self->wq_close;
+               # wait for close($lei->{0})
+               if (read($eof_wait, my $buf, 1)) {
+                       # if we get a SIGPIPE from one, kill the rest
+                       kill('TERM', @pids) if $buf eq '!';
+               }
+               my $ipc_worker_reap = $self->can('ipc_worker_reap');
+               dwaitpid($_, $ipc_worker_reap, $self) for @pids;
                query_done($lei_orig); # may SIGPIPE
        }
 }