]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/Qspawn.pm
ds: use WNOHANG with waitpid if inside event loop
[public-inbox.git] / lib / PublicInbox / Qspawn.pm
index fb48585c270609390a777e9976a3db8034e95d10..f2e91ab6ab671e67c72806d3da6ac7ee7b01bb6b 100644 (file)
@@ -27,6 +27,7 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
+use POSIX qw(WNOHANG);
 require Plack::Util;
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
@@ -73,24 +74,66 @@ sub child_err ($) {
        $msg;
 }
 
-sub finish ($) {
-       my ($self) = @_;
+# callback for dwaitpid
+sub waitpid_err ($$) {
+       my ($self, $pid) = @_;
+       my $xpid = delete $self->{pid};
+       my $err;
+       if ($pid > 0) { # success!
+               $err = child_err($?);
+       } elsif ($pid < 0) { # ??? does this happen in our case?
+               $err = "W: waitpid($xpid, 0) => $pid: $!";
+       } # else should not be called with pid == 0
+
+       # done, spawn whatever's in the queue
        my $limiter = $self->{limiter};
-       my $running;
+       my $running = --$limiter->{running};
+
+       # limiter->{max} may change dynamically
+       if (($running || $limiter->{running}) < $limiter->{max}) {
+               if (my $next = shift @{$limiter->{run_queue}}) {
+                       _do_spawn(@$next);
+               }
+       }
+
+       return unless $err;
+       $self->{err} = $err;
+       my $env = $self->{env} or return;
+       if (!$env->{'qspawn.quiet'}) {
+               $err = join(' ', @{$self->{args}->[0]}).": $err\n";
+               $env->{'psgi.errors'}->print($err);
+       }
+}
+
+sub do_waitpid ($;$) {
+       my ($self, $env) = @_;
+       my $pid = $self->{pid};
+       eval { # PublicInbox::DS may not be loaded
+               PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self);
+               $self->{env} = $env;
+       };
+       # done if we're running in PublicInbox::DS::EventLoop
+       if ($@) {
+               # non public-inbox-{httpd,nntpd} callers may block:
+               my $ret = waitpid($pid, 0);
+               waitpid_err($self, $ret);
+       }
+}
+
+sub finish ($;$) {
+       my ($self, $env) = @_;
        if (delete $self->{rpipe}) {
-               my $pid = delete $self->{pid};
-               $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) :
-                               "PID:$pid still running?";
-               $running = --$limiter->{running};
+               do_waitpid($self, $env);
        }
 
        # limiter->{max} may change dynamically
-       if (($running || $limiter->{running}) < $limiter->{max}) {
+       my $limiter = $self->{limiter};
+       if ($limiter->{running} < $limiter->{max}) {
                if (my $next = shift @{$limiter->{run_queue}}) {
                        _do_spawn(@$next);
                }
        }
-       $self->{err};
+       $self->{err}; # may be meaningless if non-blocking
 }
 
 sub start {
@@ -104,15 +147,6 @@ sub start {
        }
 }
 
-sub _psgi_finish ($$) {
-       my ($self, $env) = @_;
-       my $err = $self->finish;
-       if ($err && !$env->{'qspawn.quiet'}) {
-               $err = join(' ', @{$self->{args}->[0]}).": $err\n";
-               $env->{'psgi.errors'}->print($err);
-       }
-}
-
 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
@@ -121,7 +155,7 @@ sub psgi_qx {
        my ($self, $env, $limiter, $qx_cb) = @_;
        my $qx = PublicInbox::Qspawn::Qx->new;
        my $end = sub {
-               _psgi_finish($self, $env);
+               finish($self, $env);
                eval { $qx_cb->($qx) };
                $qx = undef;
        };
@@ -189,7 +223,7 @@ sub psgi_return {
        my ($self, $env, $limiter, $parse_hdr) = @_;
        my ($fh, $rpipe);
        my $end = sub {
-               _psgi_finish($self, $env);
+               finish($self, $env);
                $fh->close if $fh; # async-only
        };