# (tmpio = [ GLOB, offset, [ length ] ])
package PublicInbox::DS;
use strict;
+use v5.10.1;
+use parent qw(Exporter);
use bytes;
use POSIX qw(WNOHANG);
use IO::Handle qw();
use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
-use parent qw(Exporter);
-our @EXPORT_OK = qw(now msg_more);
-use 5.010_001;
use Scalar::Util qw(blessed);
use PublicInbox::Syscall qw(:epoll);
use PublicInbox::Tmpfile;
use Errno qw(EAGAIN EINVAL);
use Carp qw(confess carp);
+our @EXPORT_OK = qw(now msg_more dwaitpid);
my $nextq; # queue for next_tick
my $wait_pids; # list of [ pid, callback, callback_arg ]
my $ret = waitpid($pid, WNOHANG);
if ($ret == 0) {
push @$wait_pids, $ary; # autovivifies @$wait_pids
- } elsif ($cb) {
- eval { $cb->($arg, $pid) };
+ } elsif ($ret == $pid) {
+ if ($cb) {
+ eval { $cb->($arg, $pid) };
+ warn "E: dwaitpid($pid) in_loop: $@" if $@;
+ }
+ } else {
+ warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
}
}
# we may not be done, yet, and could've missed/masked a SIGCHLD:
}
}
-# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
-sub dwaitpid ($$$) {
- die "Not in EventLoop\n" unless $in_loop;
- push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ]
-
- # We could've just missed our SIGCHLD, cover it, here:
- enqueue_reap();
+sub dwaitpid ($;$$) {
+ my ($pid, $cb, $arg) = @_;
+ if ($in_loop) {
+ push @$wait_pids, [ $pid, $cb, $arg ];
+ # We could've just missed our SIGCHLD, cover it, here:
+ enqueue_reap();
+ } else {
+ my $ret = waitpid($pid, 0);
+ if ($ret == $pid) {
+ if ($cb) {
+ eval { $cb->($arg, $pid) };
+ warn "E: dwaitpid($pid) !in_loop: $@" if $@;
+ }
+ } else {
+ warn "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?";
+ }
+ }
}
sub _run_later () {
use PublicInbox::Spawn qw(popen_rd);
use IO::Handle ();
use PublicInbox::Syscall qw(EPOLLONESHOT);
+use PublicInbox::DS qw(dwaitpid);
# fields:
# async_cat => GitAsyncCat ref (read-only pipe)
# sock => writable pipe to Gcf2::loop
sub DESTROY {
my ($self) = @_;
- my $pid = delete $self->{pid};
delete $self->{in};
- return unless $pid;
- eval {
- PublicInbox::DS::dwaitpid($pid, undef, undef);
- $self->close; # we're still in the event loop
- };
- if ($@) { # wait synchronously if not in event loop
- my $sock = delete $self->{sock};
- close $sock if $sock;
- waitpid($pid, 0);
- }
+ # GitAsyncCat::event_step may reap us with WNOHANG, too
+ my $pid = delete $self->{pid} or return;
+ PublicInbox::DS->in_loop ? $self->close : delete($self->{sock});
+ dwaitpid $pid;
}
# used by GitAsyncCat
use IO::Poll qw(POLLIN);
use Carp qw(croak);
use Digest::SHA ();
+use PublicInbox::DS qw(dwaitpid);
our @EXPORT_OK = qw(git_unquote git_quote);
our $PIPE_BUFSIZ = 65536; # Linux default
our $in_cleanup;
# GitAsyncCat::event_step may delete {pid}
my $p = delete $self->{$pid} or return;
-
- # PublicInbox::DS may not be loaded
- eval { PublicInbox::DS::dwaitpid($p, undef, undef) };
- waitpid($p, 0) if $@; # wait synchronously if not in event loop
+ dwaitpid $p;
}
sub cat_async_abort ($) {
use PublicInbox::Config;
use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
use PublicInbox::Sigfd;
-use PublicInbox::DS qw(now);
+use PublicInbox::DS qw(now dwaitpid);
use PublicInbox::Spawn qw(spawn run_die);
use PublicInbox::OnDestroy;
use Text::Wrap qw(wrap);
my ($self, @argv) = @_;
my %rdr = map { $_ => $self->{$_} } (0..2);
my $pid = spawn(['git', @argv], $self->{env}, \%rdr);
- PublicInbox::DS::dwaitpid($pid, \&reap_exec, $self);
+ dwaitpid($pid, \&reap_exec, $self);
}
sub accept_dispatch { # Listener {post_accept} callback
package PublicInbox::ProcessPipe;
use strict;
use v5.10.1;
+use PublicInbox::DS qw(dwaitpid);
sub TIEHANDLE {
my ($class, $pid, $fh, $cb, $arg) = @_;
print { $self->{fh} } @_;
}
+sub adjust_ret { # dwaitpid callback
+ my ($retref, $pid) = @_;
+ $$retref = '' if $?
+}
+
sub CLOSE {
my $fh = delete($_[0]->{fh});
my $ret = defined $fh ? close($fh) : '';
my ($pid, $cb, $arg) = delete @{$_[0]}{qw(pid cb arg)};
if (defined $pid) {
- # PublicInbox::DS may not be loaded
- eval { PublicInbox::DS::dwaitpid($pid, $cb, $arg) };
-
- if ($@) { # ok, not in the event loop, work synchronously
- waitpid($pid, 0);
- $ret = '' if $?;
- $cb->($arg, $pid) if $cb;
+ unless ($cb) {
+ $cb = \&adjust_ret;
+ $arg = \$ret;
}
+ dwaitpid $pid, $cb, $arg;
}
$ret;
}
# operate in. This can be useful to ensure smaller inboxes can
# be cloned while cloning of large inboxes is maxed out.
#
-# This does not depend on PublicInbox::DS or any other external
-# scheduling mechanism, you just need to call start() and finish()
-# appropriately. However, public-inbox-httpd (which uses PublicInbox::DS)
-# will be able to schedule this based on readability of stdout from
-# the spawned process. See GitHTTPBackend.pm and SolverGit.pm for
-# usage examples. It does not depend on any form of threading.
+# This does not depend on the PublicInbox::DS->EventLoop or any
+# other external scheduling mechanism, you just need to call
+# start() and finish() appropriately. However, public-inbox-httpd
+# (which uses PublicInbox::DS) will be able to schedule this
+# based on readability of stdout from the spawned process.
+# See GitHTTPBackend.pm and SolverGit.pm for usage examples.
+# It does not depend on any form of threading.
#
# This is useful for scheduling CGI execution of both long-lived
# git-http-backend(1) process (for "git clone") as well as short-lived
use strict;
use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
+use PublicInbox::DS qw(dwaitpid); # doesn't need event loop
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
}
# callback for dwaitpid
-sub waitpid_err ($$) {
- my ($self, $pid) = @_;
- my $xpid = delete $self->{pid};
- my $err;
- if (defined $pid) {
- if ($pid > 0) { # success!
- $err = child_err($?);
- } elsif ($pid < 0) { # ??? does this happen in our case?
- $err = "W: waitpid($xpid, 0) => $pid: $!";
- } # else should not be called with pid == 0
- }
- finalize($self, $err);
-}
-
-sub do_waitpid ($) {
- my ($self) = @_;
- my $pid = $self->{pid};
- # PublicInbox::DS may not be loaded
- eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) };
- # done if we're running in PublicInbox::DS::EventLoop
- if ($@) {
- # non public-inbox-{httpd,nntpd} callers may block:
- my $ret = waitpid($pid, 0);
- waitpid_err($self, $ret);
- }
-}
+sub waitpid_err { finalize($_[0], child_err($?)) }
sub finish ($;$) {
my ($self, $err) = @_;
if (delete $self->{rpipe}) {
- do_waitpid($self);
+ dwaitpid $self->{pid}, \&waitpid_err, $self;
} else {
finalize($self, $err);
}