package PublicInbox::DS;
use strict;
use bytes;
-use POSIX ();
+use POSIX qw(WNOHANG);
use IO::Handle qw();
use Fcntl qw(SEEK_SET :DEFAULT);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
require File::Spec;
my $nextq = []; # queue for next_tick
+my $WaitPids = []; # list of [ pid, callback, callback_arg ]
+my $reap_timer;
our (
%DescriptorMap, # fd (num) -> PublicInbox::DS object
$Epoll, # Global epoll fd (or DSKQXS ref)
=cut
sub Reset {
%DescriptorMap = ();
+ $WaitPids = [];
+ $reap_timer = undef;
@ToClose = ();
$LoopTimeout = -1; # no timeout by default
@Timers = ();
return $timeout;
}
+# We can't use waitpid(-1) safely here since it can hit ``, system(),
+# and other things. So we scan the $WaitPids list, which is hopefully
+# not too big.
+sub reap_pids {
+ my $tmp = $WaitPids;
+ $WaitPids = [];
+ $reap_timer = undef;
+ foreach my $ary (@$tmp) {
+ my ($pid, $cb, $arg) = @$ary;
+ my $ret = waitpid($pid, WNOHANG);
+ if ($ret == 0) {
+ push @$WaitPids, $ary;
+ } elsif ($cb) {
+ eval { $cb->($arg, $pid) };
+ }
+ }
+ if (@$WaitPids) {
+ # we may not be donea, and we may miss our
+ $reap_timer = AddTimer(undef, 1, \&reap_pids);
+ }
+}
+
+# reentrant SIGCHLD handler (since reap_pids is not reentrant)
+sub enqueue_reap ($) { push @$nextq, \&reap_pids };
+
sub EpollEventLoop {
+ local $SIG{CHLD} = \&enqueue_reap;
while (1) {
my @events;
my $i;
# now we can close sockets that wanted to close during our event processing.
# (we didn't want to close them during the loop, as we didn't want fd numbers
# being reused and confused during the event loop)
- while (my $sock = shift @ToClose) {
- my $fd = fileno($sock);
-
- # close the socket. (not a PublicInbox::DS close)
- CORE::close($sock);
-
- # and now we can finally remove the fd from the map. see
- # comment above in ->close.
- delete $DescriptorMap{$fd};
- }
-
+ delete($DescriptorMap{fileno($_)}) for @ToClose;
+ @ToClose = (); # let refcounting drop everything all at once
# by default we keep running, unless a postloop callback (either per-object
# or global) cancels it
if ($! == EAGAIN) {
epwait($sock, epbit($sock, EPOLLIN) | EPOLLONESHOT);
rbuf_idle($self, $rbuf);
+ 0;
} else {
$self->close;
}
return 0;
}
}
- $self->write(\($_[1]));
+
+ # don't redispatch into NNTPdeflate::write
+ PublicInbox::DS::write($self, \($_[1]));
}
sub epwait ($$) {
my ($sock, $ev) = @_;
epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
confess("EPOLL_CTL_MOD $!");
- 0;
-}
-
-sub watch ($$) {
- my ($self, $ev) = @_;
- my $sock = $self->{sock} or return;
- epwait($sock, $ev);
}
# return true if complete, false if incomplete (or failure)
return $self->close if $! != EAGAIN;
epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+ 0;
}
+# return true if complete, false if incomplete (or failure)
sub shutdn_tls_step ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
return $self->close if $! != EAGAIN;
epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+ 0;
}
# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
$self->close;
}
}
+
+# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
+sub dwaitpid ($$$) {
+ my ($pid, $cb, $arg) = @_;
+ my $chld = $SIG{CHLD};
+ if (defined($chld) && $chld eq \&enqueue_reap) {
+ push @$WaitPids, [ $pid, $cb, $arg ];
+
+ # We could've just missed our SIGCHLD, cover it, here:
+ requeue(\&reap_pids);
+ } else {
+ die "Not in EventLoop\n";
+ }
+}
+
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
sub cancel {