]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ds: introduce awaitpid, switch ProcessPipe users
authorEric Wong <e@80x24.org>
Tue, 17 Jan 2023 07:19:03 +0000 (07:19 +0000)
committerEric Wong <e@80x24.org>
Wed, 18 Jan 2023 23:25:48 +0000 (23:25 +0000)
awaitpid is the new API which will eventually replace dwaitpid.
It enables early registration of callback handlers.  Eventually
(once dwaitpid is gone) it'll be able to use fewer waitpid
calls.

The avoidance of waitpid(-1) in our earlier days was driven by
the belief that threads may eventually become relevant for Perl 5,
but that's extremely unlikely at this stage.  I will still
introduce optional threads via C, but they definitely won't be
spawning/reaping processes.

Argument order to callbacks is swapped (PID first) to allow
flattened multiple arguments more natrually.  The previous API
(allowing only a single argument, as influenced by
pthread_create(3)) was more tedious as it involved packing
multiple arguments into yet another array.

lib/PublicInbox/DS.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/ProcessPipe.pm
lib/PublicInbox/Qspawn.pm
lib/PublicInbox/Spawn.pm
t/spawn.t

index e4629e97acc461aaa2debbe1687db8f0aecab796..9563a1cbbbb5795fc724c9b3d19b8e76dd66a7ab 100644 (file)
@@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
 use Errno qw(EAGAIN EINVAL);
 use Carp qw(carp croak);
-our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer);
+our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer);
 
 my %Stack;
 my $nextq; # queue for next_tick
 my $wait_pids; # list of [ pid, callback, callback_arg ]
+my $AWAIT_PIDS; # pid => [ $callback, @args ]
 my $reap_armed;
 my $ToClose; # sockets to close when event loop is done
 our (
@@ -74,11 +75,11 @@ sub Reset {
                # we may be iterating inside one of these on our stack
                my @q = delete @Stack{keys %Stack};
                for my $q (@q) { @$q = () }
-               $wait_pids = $nextq = $ToClose = undef;
+               $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef;
                $ep_io = undef; # closes real $Epoll FD
                $Epoll = undef; # may call DSKQXS::DESTROY
        } while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
-               $ToClose || keys(%DescriptorMap) ||
+               $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS ||
                $PostLoopCallback || keys(%UniqTimer));
 
        $reap_armed = undef;
@@ -201,6 +202,13 @@ sub block_signals () {
        $oldset;
 }
 
+sub await_cb ($;@) {
+       my ($pid, @cb_args) = @_;
+       my $cb = shift @cb_args or return;
+       eval { $cb->($pid, @cb_args) };
+       warn "E: awaitpid($pid): $@" if $@;
+}
+
 # We can't use waitpid(-1) safely here since it can hit ``, system(),
 # and other things.  So we scan the $wait_pids list, which is hopefully
 # not too big.  We keep $wait_pids small by not calling dwaitpid()
@@ -208,10 +216,12 @@ sub block_signals () {
 
 sub reap_pids {
        $reap_armed = undef;
-       my $tmp = $wait_pids or return;
+       my $tmp = $wait_pids // [];
        $wait_pids = undef;
        $Stack{reap_runq} = $tmp;
        my $oldset = block_signals();
+
+       # old API
        foreach my $ary (@$tmp) {
                my ($pid, $cb, $arg) = @$ary;
                my $ret = waitpid($pid, WNOHANG);
@@ -226,6 +236,14 @@ sub reap_pids {
                        warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
                }
        }
+
+       # new API TODO: convert to waitpid(-1) in the future as long
+       # as we don't use threads
+       for my $pid (keys %$AWAIT_PIDS) {
+               my $wpid = waitpid($pid, WNOHANG) // next;
+               my $cb_args = delete $AWAIT_PIDS->{$wpid} or next;
+               await_cb($pid, @$cb_args);
+       }
        sig_setmask($oldset);
        delete $Stack{reap_runq};
 }
@@ -720,6 +738,23 @@ sub dwaitpid ($;$$) {
        }
 }
 
+sub awaitpid {
+       my ($pid, @cb_args) = @_;
+       $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0;
+       # provide synchronous API
+       if (defined(wantarray) || (!$in_loop && !@cb_args)) {
+               my $ret = waitpid($pid, 0) // -2;
+               if ($ret == $pid) {
+                       my $cb_args = delete $AWAIT_PIDS->{$pid};
+                       @cb_args = @$cb_args if !@cb_args && $cb_args;
+                       await_cb($pid, @cb_args);
+                       return $ret;
+               }
+       }
+       # We could've just missed our SIGCHLD, cover it, here:
+       enqueue_reap() if $in_loop;
+}
+
 1;
 
 =head1 AUTHORS (Danga::Socket)
index b58e26525fe858cb63079db6d64c7bca9f96785d..1528165a7e00f9571332062c27934c09058b2418 100644 (file)
@@ -150,8 +150,8 @@ sub git_to_mail { # git->cat_async callback
        $self->{lei}->fail("$@ (oid=$oid)") if $@;
 }
 
-sub reap_compress { # dwaitpid callback
-       my ($lei, $pid) = @_;
+sub reap_compress { # awaitpid callback
+       my ($pid, $lei) = @_;
        my $cmd = delete $lei->{"pid.$pid"};
        return if $? == 0;
        $lei->fail("@$cmd failed", $? >> 8);
index 97e9c268f8d1e91aa0f8c1225112dc42cd623768..068631c6944cc1efe2f024c222ada984aac60ca4 100644 (file)
@@ -1,16 +1,25 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # a tied handle for auto reaping of children tied to a pipe, see perltie(1)
 package PublicInbox::ProcessPipe;
-use strict;
-use v5.10.1;
+use v5.12;
 use Carp qw(carp);
+use PublicInbox::DS qw(awaitpid);
+
+sub waitcb { # awaitpid callback
+       my ($pid, $err_ref, $cb, @args) = @_;
+       $$err_ref = $?; # sets >{pp_chld_err} for _close
+       $cb->($pid, @args) if $cb;
+}
 
 sub TIEHANDLE {
-       my ($class, $pid, $fh, $cb, $arg) = @_;
-       bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg },
-               $class;
+       my ($cls, $pid, $fh, @cb_arg) = @_;
+       my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls;
+       # we share $err (and not $self) with awaitpid to avoid a ref cycle
+       $self->{pp_chld_err} = \(my $err);
+       awaitpid($pid, \&waitcb, \$err, @cb_arg);
+       $self;
 }
 
 sub BINMODE { binmode(shift->{fh}) } # for IO::Uncompress::Gunzip
@@ -33,24 +42,15 @@ sub FILENO { fileno($_[0]->{fh}) }
 
 sub _close ($;$) {
        my ($self, $wait) = @_;
-       my $fh = delete $self->{fh};
+       my ($fh, $pid) = delete(@$self{qw(fh pid)});
        my $ret = defined($fh) ? close($fh) : '';
-       my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)};
        return $ret unless defined($pid) && $self->{ppid} == $$;
        if ($wait) { # caller cares about the exit status:
-               my $wp = waitpid($pid, 0);
-               if ($wp == $pid) {
-                       $ret = '' if $?;
-                       if ($cb) {
-                               eval { $cb->($arg, $pid) };
-                               carp "E: cb(arg, $pid): $@" if $@;
-                       }
-               } else {
-                       carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?";
-               }
-       } else { # caller just undef-ed it, let event loop deal with it
-               require PublicInbox::DS;
-               PublicInbox::DS::dwaitpid($pid, $cb, $arg);
+               # synchronous wait via defined(wantarray) on awaitpid:
+               defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid);
+               ($? = ${$self->{pp_chld_err}}) and $ret = '';
+       } else {
+               awaitpid($pid); # depends on $in_loop or not
        }
        $ret;
 }
index 779b703a0f270cf81196279c7f616a6b7c7283ac..02357dbf83988fb4db2c2223a9f0a58dca17187f 100644 (file)
@@ -28,6 +28,7 @@ package PublicInbox::Qspawn;
 use v5.12;
 use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
+use PublicInbox::DS qw(awaitpid);
 use Scalar::Util qw(blessed);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
@@ -57,35 +58,21 @@ sub _do_spawn {
                }
        }
        $self->{cmd} = $o{quiet} ? undef : $cmd;
+       $o{cb_arg} = [ \&waitpid_err, $self ];
        eval {
                # popen_rd may die on EMFILE, ENFILE
-               $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o);
-
-               die "E: $!" unless defined($self->{rpipe});
-
+               $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o) // die "E: $!";
                $limiter->{running}++;
                $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
        };
        finish($self, $@) if $@;
 }
 
-sub child_err ($) {
-       my ($child_error) = @_; # typically $?
-       my $exitstatus = ($child_error >> 8) or return;
-       my $sig = $child_error & 127;
-       my $msg = "exit status=$exitstatus";
-       $msg .= " signal=$sig" if $sig;
-       $msg;
-}
-
-sub finalize ($$) {
-       my ($self, $err) = @_;
-
-       my ($env, $qx_cb, $qx_arg, $qx_buf) =
-               delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+sub finalize ($) {
+       my ($self) = @_;
 
-       # done, spawn whatever's in the queue
-       my $limiter = $self->{limiter};
+       # process is done, spawn whatever's in the queue
+       my $limiter = delete $self->{limiter} or return;
        my $running = --$limiter->{running};
 
        if ($running < $limiter->{max}) {
@@ -93,14 +80,16 @@ sub finalize ($$) {
                        _do_spawn(@$next, $limiter);
                }
        }
-
-       if ($err) {
+       if (my $err = $self->{_err}) { # set by finish or waitpid_err
                utf8::decode($err);
                if (my $dst = $self->{qsp_err}) {
                        $$dst .= $$dst ? " $err" : "; $err";
                }
                warn "@{$self->{cmd}}: $err" if $self->{cmd};
        }
+
+       my ($env, $qx_cb, $qx_arg, $qx_buf) =
+               delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
        if ($qx_cb) {
                eval { $qx_cb->($qx_buf, $qx_arg) };
                return unless $@;
@@ -115,14 +104,28 @@ sub finalize ($$) {
        }
 }
 
-# callback for dwaitpid or ProcessPipe
-sub waitpid_err { finalize($_[0], child_err($?)) }
+sub waitpid_err { # callback for awaitpid
+       my (undef, $self) = @_; # $_[0]: pid
+       $self->{_err} = ''; # for defined check in ->finish
+       if ($?) {
+               my $status = $? >> 8;
+               my $sig = $? & 127;
+               $self->{_err} .= "exit status=$status";
+               $self->{_err} .= " signal=$sig" if $sig;
+       }
+       finalize($self) if !$self->{rpipe};
+}
 
 sub finish ($;$) {
        my ($self, $err) = @_;
-       my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err);
-       my PublicInbox::ProcessPipe $pp = tied *$tied_pp;
-       @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY
+       $self->{_err} //= $err; # only for $@
+
+       # we can safely finalize if pipe was closed before, or if
+       # {_err} is defined by waitpid_err.  Deleting {rpipe} will
+       # trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err,
+       # but it may not fire right away if inside the event loop.
+       my $closed_before = !delete($self->{rpipe});
+       finalize($self) if $closed_before || defined($self->{_err});
 }
 
 sub start ($$$) {
@@ -247,10 +250,9 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
        if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
                if ($async) { # calls rpipe->close && ->event_step
                        $async->close; # PublicInbox::HTTPD::Async::close
-               } else { # generic PSGI:
+               } else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE
                        delete($self->{rpipe})->close;
                        event_step($self);
-                       waitpid_err($self);
                }
                if (ref($r) eq 'ARRAY') { # error
                        $wcb->($r)
index 7f61d8db54cf6bec9daa1cd83b38eeade23f96b3..826ee508801ea1f9d519b16196c118e535997a02 100644 (file)
@@ -365,9 +365,9 @@ sub popen_rd {
        $opt->{1} = fileno($w);
        my $pid = spawn($cmd, $env, $opt);
        return ($r, $pid) if wantarray;
-       my $ret = gensym;
-       tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
-       $ret;
+       my $s = gensym;
+       tie *$s, 'PublicInbox::ProcessPipe', $pid, $r, @{$opt->{cb_arg} // []};
+       $s;
 }
 
 sub run_die ($;$$) {
index 5fc99a2a101c8c2c83466e515a84bba471b9546e..c22cfcfcef1234a62f84ceb213d5fcf547d47020 100644 (file)
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -140,13 +140,13 @@ EOF
 
 { # ->CLOSE vs ->DESTROY waitpid caller distinction
        my @c;
-       my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+       my $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
        ok(close($fh), '->CLOSE fired and successful');
        ok(scalar(@c), 'callback fired by ->CLOSE');
        ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS');
 
        @c = ();
-       $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+       $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
        undef $fh; # ->DESTROY
        ok(scalar(@c), 'callback fired by ->DESTROY');
        ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
@@ -156,8 +156,9 @@ EOF
 { # children don't wait on siblings
        use POSIX qw(_exit);
        pipe(my ($r, $w)) or BAIL_OUT $!;
-       my $cb = sub { warn "x=$$\n" };
-       my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb });
+       my @arg;
+       my $cb = [ sub { @arg = @_; warn "x=$$\n" }, 'hi' ];
+       my $fh = popen_rd(['cat'], undef, { 0 => $r, cb_arg => $cb });
        my $pp = tied *$fh;
        my $pid = fork // BAIL_OUT $!;
        local $SIG{__WARN__} = sub { _exit(1) };
@@ -173,6 +174,9 @@ EOF
        close $w;
        close $fh;
        is($?, 0, 'cat exited');
+       is(scalar(@arg), 2, 'callback got args');
+       is($arg[1], 'hi', 'passed arg');
+       like($arg[0], qr/\A\d+\z/, 'PID');
        is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner');
 }