]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ds: use WNOHANG with waitpid if inside event loop
authorEric Wong <e@80x24.org>
Mon, 8 Jul 2019 07:01:59 +0000 (07:01 +0000)
committerEric Wong <e@80x24.org>
Mon, 8 Jul 2019 07:10:35 +0000 (07:10 +0000)
While we're usually not stuck waiting on waitpid after
seeing a pipe EOF or even triggering SIGPIPE in the process
(e.g. git-http-backend) we're reading from, it MAY happen
and we should be careful to never hang the daemon process
on waitpid calls.

v2: use "eq" for string comparison against 'DEFAULT'

lib/PublicInbox/DS.pm
lib/PublicInbox/Git.pm
lib/PublicInbox/ProcessPipe.pm
lib/PublicInbox/Qspawn.pm

index 586c47cdc8cef94e05509dfbf0a82136a147f870..51515bf6b5730c3ddadb374a4e6aaa02b3f6a31d 100644 (file)
@@ -16,7 +16,7 @@
 package PublicInbox::DS;
 use strict;
 use bytes;
-use POSIX ();
+use POSIX qw(WNOHANG);
 use IO::Handle qw();
 use Fcntl qw(SEEK_SET :DEFAULT);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
@@ -38,6 +38,8 @@ use Carp   qw(croak confess carp);
 require File::Spec;
 
 my $nextq = []; # queue for next_tick
+my $WaitPids = [];               # list of [ pid, callback, callback_arg ]
+my $reap_timer;
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
@@ -64,6 +66,8 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
+    $WaitPids = [];
+    $reap_timer = undef;
     @ToClose = ();
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
@@ -215,7 +219,33 @@ sub RunTimers {
     return $timeout;
 }
 
+# We can't use waitpid(-1) safely here since it can hit ``, system(),
+# and other things.  So we scan the $WaitPids list, which is hopefully
+# not too big.
+sub reap_pids {
+    my $tmp = $WaitPids;
+    $WaitPids = [];
+    $reap_timer = undef;
+    foreach my $ary (@$tmp) {
+        my ($pid, $cb, $arg) = @$ary;
+        my $ret = waitpid($pid, WNOHANG);
+        if ($ret == 0) {
+            push @$WaitPids, $ary;
+        } elsif ($cb) {
+            eval { $cb->($arg, $pid) };
+        }
+    }
+    if (@$WaitPids) {
+        # we may not be donea, and we may miss our
+        $reap_timer = AddTimer(undef, 1, \&reap_pids);
+    }
+}
+
+# reentrant SIGCHLD handler (since reap_pids is not reentrant)
+sub enqueue_reap ($) { push @$nextq, \&reap_pids };
+
 sub EpollEventLoop {
+    local $SIG{CHLD} = \&enqueue_reap;
     while (1) {
         my @events;
         my $i;
@@ -595,6 +625,21 @@ sub shutdn ($) {
        $self->close;
     }
 }
+
+# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
+sub dwaitpid ($$$) {
+    my ($pid, $cb, $arg) = @_;
+    my $chld = $SIG{CHLD};
+    if (defined($chld) && $chld eq \&enqueue_reap) {
+        push @$WaitPids, [ $pid, $cb, $arg ];
+
+        # We could've just missed our SIGCHLD, cover it, here:
+        requeue(\&reap_pids);
+    } else {
+        die "Not in EventLoop\n";
+    }
+}
+
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {
index f5c7a95c34d7aedad7860a1189f8e121167eb1fb..2f1a61f97f5e1da13efac6e3c36b3760390fb30d 100644 (file)
@@ -198,7 +198,10 @@ sub _destroy {
        my $p = delete $self->{$pid} or return;
        delete @$self{($in, $out)};
        delete $self->{$err} if $err; # `err_c'
-       waitpid $p, 0;
+
+       # PublicInbox::DS may not be loaded
+       eval { PublicInbox::DS::dwaitpid($p, undef, undef) };
+       waitpid($p, 0) if $@; # wait synchronously if not in event loop
 }
 
 sub fail {
index 2769e064ca21d1651fbace905a167ba17922802b..4f5fc7515c33f91d44e0122d4487fff99dbac6ce 100644 (file)
@@ -20,8 +20,13 @@ sub CLOSE {
        my $ret = defined $fh ? close($fh) : '';
        my $pid = delete $_[0]->{pid};
        if (defined $pid) {
-               waitpid($pid, 0);
-               $ret = '' if $?;
+               # PublicInbox::DS may not be loaded
+               eval { PublicInbox::DS::dwaitpid($pid, undef, undef) };
+
+               if ($@) { # ok, not in the event loop, work synchronously
+                       waitpid($pid, 0);
+                       $ret = '' if $?;
+               }
        }
        $ret;
 }
index fb48585c270609390a777e9976a3db8034e95d10..f2e91ab6ab671e67c72806d3da6ac7ee7b01bb6b 100644 (file)
@@ -27,6 +27,7 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
+use POSIX qw(WNOHANG);
 require Plack::Util;
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
@@ -73,24 +74,66 @@ sub child_err ($) {
        $msg;
 }
 
-sub finish ($) {
-       my ($self) = @_;
+# callback for dwaitpid
+sub waitpid_err ($$) {
+       my ($self, $pid) = @_;
+       my $xpid = delete $self->{pid};
+       my $err;
+       if ($pid > 0) { # success!
+               $err = child_err($?);
+       } elsif ($pid < 0) { # ??? does this happen in our case?
+               $err = "W: waitpid($xpid, 0) => $pid: $!";
+       } # else should not be called with pid == 0
+
+       # done, spawn whatever's in the queue
        my $limiter = $self->{limiter};
-       my $running;
+       my $running = --$limiter->{running};
+
+       # limiter->{max} may change dynamically
+       if (($running || $limiter->{running}) < $limiter->{max}) {
+               if (my $next = shift @{$limiter->{run_queue}}) {
+                       _do_spawn(@$next);
+               }
+       }
+
+       return unless $err;
+       $self->{err} = $err;
+       my $env = $self->{env} or return;
+       if (!$env->{'qspawn.quiet'}) {
+               $err = join(' ', @{$self->{args}->[0]}).": $err\n";
+               $env->{'psgi.errors'}->print($err);
+       }
+}
+
+sub do_waitpid ($;$) {
+       my ($self, $env) = @_;
+       my $pid = $self->{pid};
+       eval { # PublicInbox::DS may not be loaded
+               PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self);
+               $self->{env} = $env;
+       };
+       # done if we're running in PublicInbox::DS::EventLoop
+       if ($@) {
+               # non public-inbox-{httpd,nntpd} callers may block:
+               my $ret = waitpid($pid, 0);
+               waitpid_err($self, $ret);
+       }
+}
+
+sub finish ($;$) {
+       my ($self, $env) = @_;
        if (delete $self->{rpipe}) {
-               my $pid = delete $self->{pid};
-               $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) :
-                               "PID:$pid still running?";
-               $running = --$limiter->{running};
+               do_waitpid($self, $env);
        }
 
        # limiter->{max} may change dynamically
-       if (($running || $limiter->{running}) < $limiter->{max}) {
+       my $limiter = $self->{limiter};
+       if ($limiter->{running} < $limiter->{max}) {
                if (my $next = shift @{$limiter->{run_queue}}) {
                        _do_spawn(@$next);
                }
        }
-       $self->{err};
+       $self->{err}; # may be meaningless if non-blocking
 }
 
 sub start {
@@ -104,15 +147,6 @@ sub start {
        }
 }
 
-sub _psgi_finish ($$) {
-       my ($self, $env) = @_;
-       my $err = $self->finish;
-       if ($err && !$env->{'qspawn.quiet'}) {
-               $err = join(' ', @{$self->{args}->[0]}).": $err\n";
-               $env->{'psgi.errors'}->print($err);
-       }
-}
-
 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
@@ -121,7 +155,7 @@ sub psgi_qx {
        my ($self, $env, $limiter, $qx_cb) = @_;
        my $qx = PublicInbox::Qspawn::Qx->new;
        my $end = sub {
-               _psgi_finish($self, $env);
+               finish($self, $env);
                eval { $qx_cb->($qx) };
                $qx = undef;
        };
@@ -189,7 +223,7 @@ sub psgi_return {
        my ($self, $env, $limiter, $parse_hdr) = @_;
        my ($fh, $rpipe);
        my $end = sub {
-               _psgi_finish($self, $env);
+               finish($self, $env);
                $fh->close if $fh; # async-only
        };