X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;fp=lib%2FPublicInbox%2FDS.pm;h=9563a1cbbbb5795fc724c9b3d19b8e76dd66a7ab;hp=e4629e97acc461aaa2debbe1687db8f0aecab796;hb=6e9397d12635eae55c9114ed9689413154fed8ce;hpb=a1ee3e0d84fedc4a2dd4e16e7054ee6fdfbe111a diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index e4629e97..9563a1cb 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; use Errno qw(EAGAIN EINVAL); use Carp qw(carp croak); -our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer); +our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer); my %Stack; my $nextq; # queue for next_tick my $wait_pids; # list of [ pid, callback, callback_arg ] +my $AWAIT_PIDS; # pid => [ $callback, @args ] my $reap_armed; my $ToClose; # sockets to close when event loop is done our ( @@ -74,11 +75,11 @@ sub Reset { # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; for my $q (@q) { @$q = () } - $wait_pids = $nextq = $ToClose = undef; + $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef; $ep_io = undef; # closes real $Epoll FD $Epoll = undef; # may call DSKQXS::DESTROY } while (@Timers || keys(%Stack) || $nextq || $wait_pids || - $ToClose || keys(%DescriptorMap) || + $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS || $PostLoopCallback || keys(%UniqTimer)); $reap_armed = undef; @@ -201,6 +202,13 @@ sub block_signals () { $oldset; } +sub await_cb ($;@) { + my ($pid, @cb_args) = @_; + my $cb = shift @cb_args or return; + eval { $cb->($pid, @cb_args) }; + warn "E: awaitpid($pid): $@" if $@; +} + # We can't use waitpid(-1) safely here since it can hit ``, system(), # and other things. So we scan the $wait_pids list, which is hopefully # not too big. We keep $wait_pids small by not calling dwaitpid() @@ -208,10 +216,12 @@ sub block_signals () { sub reap_pids { $reap_armed = undef; - my $tmp = $wait_pids or return; + my $tmp = $wait_pids // []; $wait_pids = undef; $Stack{reap_runq} = $tmp; my $oldset = block_signals(); + + # old API foreach my $ary (@$tmp) { my ($pid, $cb, $arg) = @$ary; my $ret = waitpid($pid, WNOHANG); @@ -226,6 +236,14 @@ sub reap_pids { warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?"; } } + + # new API TODO: convert to waitpid(-1) in the future as long + # as we don't use threads + for my $pid (keys %$AWAIT_PIDS) { + my $wpid = waitpid($pid, WNOHANG) // next; + my $cb_args = delete $AWAIT_PIDS->{$wpid} or next; + await_cb($pid, @$cb_args); + } sig_setmask($oldset); delete $Stack{reap_runq}; } @@ -720,6 +738,23 @@ sub dwaitpid ($;$$) { } } +sub awaitpid { + my ($pid, @cb_args) = @_; + $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0; + # provide synchronous API + if (defined(wantarray) || (!$in_loop && !@cb_args)) { + my $ret = waitpid($pid, 0) // -2; + if ($ret == $pid) { + my $cb_args = delete $AWAIT_PIDS->{$pid}; + @cb_args = @$cb_args if !@cb_args && $cb_args; + await_cb($pid, @cb_args); + return $ret; + } + } + # We could've just missed our SIGCHLD, cover it, here: + enqueue_reap() if $in_loop; +} + 1; =head1 AUTHORS (Danga::Socket)