awaitpid is the new API which will eventually replace dwaitpid.
It enables early registration of callback handlers. Eventually
(once dwaitpid is gone) it'll be able to use fewer waitpid
calls.
The avoidance of waitpid(-1) in our earlier days was driven by
the belief that threads may eventually become relevant for Perl 5,
but that's extremely unlikely at this stage. I will still
introduce optional threads via C, but they definitely won't be
spawning/reaping processes.
Argument order to callbacks is swapped (PID first) to allow
flattened multiple arguments more natrually. The previous API
(allowing only a single argument, as influenced by
pthread_create(3)) was more tedious as it involved packing
multiple arguments into yet another array.
use PublicInbox::Tmpfile;
use Errno qw(EAGAIN EINVAL);
use Carp qw(carp croak);
-our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer);
+our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer);
my %Stack;
my $nextq; # queue for next_tick
my $wait_pids; # list of [ pid, callback, callback_arg ]
+my $AWAIT_PIDS; # pid => [ $callback, @args ]
my $reap_armed;
my $ToClose; # sockets to close when event loop is done
our (
# we may be iterating inside one of these on our stack
my @q = delete @Stack{keys %Stack};
for my $q (@q) { @$q = () }
- $wait_pids = $nextq = $ToClose = undef;
+ $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef;
$ep_io = undef; # closes real $Epoll FD
$Epoll = undef; # may call DSKQXS::DESTROY
} while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
- $ToClose || keys(%DescriptorMap) ||
+ $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS ||
$PostLoopCallback || keys(%UniqTimer));
$reap_armed = undef;
$oldset;
}
+sub await_cb ($;@) {
+ my ($pid, @cb_args) = @_;
+ my $cb = shift @cb_args or return;
+ eval { $cb->($pid, @cb_args) };
+ warn "E: awaitpid($pid): $@" if $@;
+}
+
# We can't use waitpid(-1) safely here since it can hit ``, system(),
# and other things. So we scan the $wait_pids list, which is hopefully
# not too big. We keep $wait_pids small by not calling dwaitpid()
sub reap_pids {
$reap_armed = undef;
- my $tmp = $wait_pids or return;
+ my $tmp = $wait_pids // [];
$wait_pids = undef;
$Stack{reap_runq} = $tmp;
my $oldset = block_signals();
+
+ # old API
foreach my $ary (@$tmp) {
my ($pid, $cb, $arg) = @$ary;
my $ret = waitpid($pid, WNOHANG);
warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
}
}
+
+ # new API TODO: convert to waitpid(-1) in the future as long
+ # as we don't use threads
+ for my $pid (keys %$AWAIT_PIDS) {
+ my $wpid = waitpid($pid, WNOHANG) // next;
+ my $cb_args = delete $AWAIT_PIDS->{$wpid} or next;
+ await_cb($pid, @$cb_args);
+ }
sig_setmask($oldset);
delete $Stack{reap_runq};
}
}
}
+sub awaitpid {
+ my ($pid, @cb_args) = @_;
+ $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0;
+ # provide synchronous API
+ if (defined(wantarray) || (!$in_loop && !@cb_args)) {
+ my $ret = waitpid($pid, 0) // -2;
+ if ($ret == $pid) {
+ my $cb_args = delete $AWAIT_PIDS->{$pid};
+ @cb_args = @$cb_args if !@cb_args && $cb_args;
+ await_cb($pid, @cb_args);
+ return $ret;
+ }
+ }
+ # We could've just missed our SIGCHLD, cover it, here:
+ enqueue_reap() if $in_loop;
+}
+
1;
=head1 AUTHORS (Danga::Socket)
$self->{lei}->fail("$@ (oid=$oid)") if $@;
}
-sub reap_compress { # dwaitpid callback
- my ($lei, $pid) = @_;
+sub reap_compress { # awaitpid callback
+ my ($pid, $lei) = @_;
my $cmd = delete $lei->{"pid.$pid"};
return if $? == 0;
$lei->fail("@$cmd failed", $? >> 8);
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# a tied handle for auto reaping of children tied to a pipe, see perltie(1)
package PublicInbox::ProcessPipe;
-use strict;
-use v5.10.1;
+use v5.12;
use Carp qw(carp);
+use PublicInbox::DS qw(awaitpid);
+
+sub waitcb { # awaitpid callback
+ my ($pid, $err_ref, $cb, @args) = @_;
+ $$err_ref = $?; # sets >{pp_chld_err} for _close
+ $cb->($pid, @args) if $cb;
+}
sub TIEHANDLE {
- my ($class, $pid, $fh, $cb, $arg) = @_;
- bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg },
- $class;
+ my ($cls, $pid, $fh, @cb_arg) = @_;
+ my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls;
+ # we share $err (and not $self) with awaitpid to avoid a ref cycle
+ $self->{pp_chld_err} = \(my $err);
+ awaitpid($pid, \&waitcb, \$err, @cb_arg);
+ $self;
}
sub BINMODE { binmode(shift->{fh}) } # for IO::Uncompress::Gunzip
sub _close ($;$) {
my ($self, $wait) = @_;
- my $fh = delete $self->{fh};
+ my ($fh, $pid) = delete(@$self{qw(fh pid)});
my $ret = defined($fh) ? close($fh) : '';
- my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)};
return $ret unless defined($pid) && $self->{ppid} == $$;
if ($wait) { # caller cares about the exit status:
- my $wp = waitpid($pid, 0);
- if ($wp == $pid) {
- $ret = '' if $?;
- if ($cb) {
- eval { $cb->($arg, $pid) };
- carp "E: cb(arg, $pid): $@" if $@;
- }
- } else {
- carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?";
- }
- } else { # caller just undef-ed it, let event loop deal with it
- require PublicInbox::DS;
- PublicInbox::DS::dwaitpid($pid, $cb, $arg);
+ # synchronous wait via defined(wantarray) on awaitpid:
+ defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid);
+ ($? = ${$self->{pp_chld_err}}) and $ret = '';
+ } else {
+ awaitpid($pid); # depends on $in_loop or not
}
$ret;
}
use v5.12;
use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
+use PublicInbox::DS qw(awaitpid);
use Scalar::Util qw(blessed);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
}
}
$self->{cmd} = $o{quiet} ? undef : $cmd;
+ $o{cb_arg} = [ \&waitpid_err, $self ];
eval {
# popen_rd may die on EMFILE, ENFILE
- $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o);
-
- die "E: $!" unless defined($self->{rpipe});
-
+ $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o) // die "E: $!";
$limiter->{running}++;
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
finish($self, $@) if $@;
}
-sub child_err ($) {
- my ($child_error) = @_; # typically $?
- my $exitstatus = ($child_error >> 8) or return;
- my $sig = $child_error & 127;
- my $msg = "exit status=$exitstatus";
- $msg .= " signal=$sig" if $sig;
- $msg;
-}
-
-sub finalize ($$) {
- my ($self, $err) = @_;
-
- my ($env, $qx_cb, $qx_arg, $qx_buf) =
- delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+sub finalize ($) {
+ my ($self) = @_;
- # done, spawn whatever's in the queue
- my $limiter = $self->{limiter};
+ # process is done, spawn whatever's in the queue
+ my $limiter = delete $self->{limiter} or return;
my $running = --$limiter->{running};
if ($running < $limiter->{max}) {
_do_spawn(@$next, $limiter);
}
}
-
- if ($err) {
+ if (my $err = $self->{_err}) { # set by finish or waitpid_err
utf8::decode($err);
if (my $dst = $self->{qsp_err}) {
$$dst .= $$dst ? " $err" : "; $err";
}
warn "@{$self->{cmd}}: $err" if $self->{cmd};
}
+
+ my ($env, $qx_cb, $qx_arg, $qx_buf) =
+ delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
if ($qx_cb) {
eval { $qx_cb->($qx_buf, $qx_arg) };
return unless $@;
}
}
-# callback for dwaitpid or ProcessPipe
-sub waitpid_err { finalize($_[0], child_err($?)) }
+sub waitpid_err { # callback for awaitpid
+ my (undef, $self) = @_; # $_[0]: pid
+ $self->{_err} = ''; # for defined check in ->finish
+ if ($?) {
+ my $status = $? >> 8;
+ my $sig = $? & 127;
+ $self->{_err} .= "exit status=$status";
+ $self->{_err} .= " signal=$sig" if $sig;
+ }
+ finalize($self) if !$self->{rpipe};
+}
sub finish ($;$) {
my ($self, $err) = @_;
- my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err);
- my PublicInbox::ProcessPipe $pp = tied *$tied_pp;
- @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY
+ $self->{_err} //= $err; # only for $@
+
+ # we can safely finalize if pipe was closed before, or if
+ # {_err} is defined by waitpid_err. Deleting {rpipe} will
+ # trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err,
+ # but it may not fire right away if inside the event loop.
+ my $closed_before = !delete($self->{rpipe});
+ finalize($self) if $closed_before || defined($self->{_err});
}
sub start ($$$) {
if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
if ($async) { # calls rpipe->close && ->event_step
$async->close; # PublicInbox::HTTPD::Async::close
- } else { # generic PSGI:
+ } else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE
delete($self->{rpipe})->close;
event_step($self);
- waitpid_err($self);
}
if (ref($r) eq 'ARRAY') { # error
$wcb->($r)
$opt->{1} = fileno($w);
my $pid = spawn($cmd, $env, $opt);
return ($r, $pid) if wantarray;
- my $ret = gensym;
- tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
- $ret;
+ my $s = gensym;
+ tie *$s, 'PublicInbox::ProcessPipe', $pid, $r, @{$opt->{cb_arg} // []};
+ $s;
}
sub run_die ($;$$) {
{ # ->CLOSE vs ->DESTROY waitpid caller distinction
my @c;
- my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+ my $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
ok(close($fh), '->CLOSE fired and successful');
ok(scalar(@c), 'callback fired by ->CLOSE');
ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS');
@c = ();
- $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+ $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
undef $fh; # ->DESTROY
ok(scalar(@c), 'callback fired by ->DESTROY');
ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
{ # children don't wait on siblings
use POSIX qw(_exit);
pipe(my ($r, $w)) or BAIL_OUT $!;
- my $cb = sub { warn "x=$$\n" };
- my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb });
+ my @arg;
+ my $cb = [ sub { @arg = @_; warn "x=$$\n" }, 'hi' ];
+ my $fh = popen_rd(['cat'], undef, { 0 => $r, cb_arg => $cb });
my $pp = tied *$fh;
my $pid = fork // BAIL_OUT $!;
local $SIG{__WARN__} = sub { _exit(1) };
close $w;
close $fh;
is($?, 0, 'cat exited');
+ is(scalar(@arg), 2, 'callback got args');
+ is($arg[1], 'hi', 'passed arg');
+ like($arg[0], qr/\A\d+\z/, 'PID');
is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner');
}