X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=7a4dfed06242320f28dcb7decbfe3a76a8bcd535;hb=e901a56b3b30b22f16bc9c6460150b2b402b4ee7;hp=a3f2e76c16afcfd84901ca38492ad4eebbd1d6a5;hpb=62bb26d307a106d6c9b42e36b583fd04d90a4687;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index a3f2e76c..7a4dfed0 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -21,26 +21,26 @@ # (tmpio = [ GLOB, offset, [ length ] ]) package PublicInbox::DS; use strict; +use v5.10.1; +use parent qw(Exporter); use bytes; -use POSIX qw(WNOHANG); -use IO::Handle qw(); +use POSIX qw(WNOHANG sigprocmask SIG_SETMASK); use Fcntl qw(SEEK_SET :DEFAULT O_APPEND); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); -use parent qw(Exporter); -our @EXPORT_OK = qw(now msg_more); -use 5.010_001; use Scalar::Util qw(blessed); use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; use Errno qw(EAGAIN EINVAL); -use Carp qw(confess carp); +use Carp qw(carp croak); +our @EXPORT_OK = qw(now msg_more dwaitpid add_timer); +my %Stack; my $nextq; # queue for next_tick my $wait_pids; # list of [ pid, callback, callback_arg ] -my $later_queue; # list of callbacks to run at some later interval +my $later_q; # list of callbacks to run at some later interval my $EXPMAP; # fd -> idle_time our $EXPTIME = 180; # 3 minutes -my ($later_timer, $reap_timer, $exp_timer); +my ($later_timer, $reap_armed, $exp_timer); my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object @@ -50,7 +50,6 @@ our ( $PostLoopCallback, # subref to call at the end of each loop, if defined (global) $LoopTimeout, # timeout of event loop in milliseconds - $DoneInit, # if we've done the one-time module init yet @Timers, # timers $in_loop, ); @@ -67,20 +66,25 @@ Reset all state =cut sub Reset { - %DescriptorMap = (); - $in_loop = $wait_pids = $later_queue = undef; - $EXPMAP = {}; - $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef; - $LoopTimeout = -1; # no timeout by default - @Timers = (); - - $PostLoopCallback = undef; - $DoneInit = 0; - - $_io = undef; # closes real $Epoll FD - $Epoll = undef; # may call DSKQXS::DESTROY - - *EventLoop = *FirstTimeEventLoop; + do { + $in_loop = undef; # first in case DESTROY callbacks use this + %DescriptorMap = (); + @Timers = (); + $PostLoopCallback = undef; + + # we may be iterating inside one of these on our stack + my @q = delete @Stack{keys %Stack}; + for my $q (@q) { @$q = () } + $EXPMAP = {}; + $wait_pids = $later_q = $nextq = $ToClose = undef; + $_io = undef; # closes real $Epoll FD + $Epoll = undef; # may call DSKQXS::DESTROY + } while (@Timers || keys(%Stack) || $nextq || $wait_pids || + $later_q || $ToClose || keys(%DescriptorMap) || + $PostLoopCallback); + + $reap_armed = $later_timer = $exp_timer = undef; + $LoopTimeout = -1; # no timeout by default } =head2 C<< CLASS->SetLoopTimeout( $timeout ) >> @@ -91,9 +95,7 @@ A timeout of 0 (zero) means poll forever. A timeout of -1 means poll and return immediately. =cut -sub SetLoopTimeout { - return $LoopTimeout = $_[1] + 0; -} +sub SetLoopTimeout { $LoopTimeout = $_[1] + 0 } =head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >> @@ -101,12 +103,12 @@ Add a timer to occur $seconds from now. $seconds may be fractional, but timers are not guaranteed to fire at the exact time you ask for. =cut -sub add_timer ($$;$) { - my ($secs, $coderef, $arg) = @_; +sub add_timer ($$;@) { + my ($secs, $coderef, @args) = @_; my $fire_time = now() + $secs; - my $timer = [$fire_time, $coderef, $arg]; + my $timer = [$fire_time, $coderef, @args]; if (!@Timers || $fire_time >= $Timers[-1][0]) { push @Timers, $timer; @@ -132,19 +134,18 @@ sub add_timer ($$;$) { sub set_cloexec ($) { my ($fd) = @_; - $_io = IO::Handle->new_from_fd($fd, 'r+') or return; + open($_io, '+<&=', $fd) or return; defined(my $fl = fcntl($_io, F_GETFD, 0)) or return; fcntl($_io, F_SETFD, $fl | FD_CLOEXEC); } +# caller sets return value to $Epoll sub _InitPoller { - return if $DoneInit; - $DoneInit = 1; - if (PublicInbox::Syscall::epoll_defined()) { - $Epoll = epoll_create(); - set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0); + my $fd = epoll_create(); + set_cloexec($fd) if (defined($fd) && $fd >= 0); + $fd; } else { my $cls; for (qw(DSKQXS DSPoll)) { @@ -152,9 +153,8 @@ sub _InitPoller last if eval "require $cls"; } $cls->import(qw(epoll_ctl epoll_wait)); - $Epoll = $cls->new; + $cls->new; } - *EventLoop = *EpollEventLoop; } =head2 C<< CLASS->EventLoop() >> @@ -163,28 +163,23 @@ Start processing IO events. In most daemon programs this never exits. See C below for how to exit the loop. =cut -sub FirstTimeEventLoop { - my $class = shift; - - _InitPoller(); - - EventLoop($class); -} sub now () { clock_gettime(CLOCK_MONOTONIC) } sub next_tick () { - my $q = $nextq or return; - $nextq = undef; - for (@$q) { - # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak: - # https://rt.perl.org/Public/Bug/Display.html?id=114340 - if (blessed($_)) { - $_->event_step; - } else { - $_->(); - } - } + my $q = $nextq or return; + $nextq = undef; + $Stack{cur_runq} = $q; + for my $obj (@$q) { + # avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak: + # https://rt.perl.org/Public/Bug/Display.html?id=114340 + if (blessed($obj)) { + $obj->event_step; + } else { + $obj->(); + } + } + delete $Stack{cur_runq}; } # runs timers and returns milliseconds for next one, or next event loop @@ -198,7 +193,7 @@ sub RunTimers { # Run expired timers while (@Timers && $Timers[0][0] <= $now) { my $to_run = shift(@Timers); - $to_run->[1]->($to_run->[2]); + $to_run->[1]->(@$to_run[2..$#$to_run]); } # timers may enqueue into nextq: @@ -213,36 +208,50 @@ sub RunTimers { my $timeout = int(($Timers[0][0] - $now) * 1000) + 1; # -1 is an infinite timeout, so prefer a real timeout - return $timeout if $LoopTimeout == -1; + ($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout; +} - # otherwise pick the lower of our regular timeout and time until - # the next timer - return $LoopTimeout if $LoopTimeout < $timeout; - return $timeout; +sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" } + +sub block_signals () { + my $oldset = POSIX::SigSet->new; + my $newset = POSIX::SigSet->new; + $newset->fillset or die "fillset: $!"; + sig_setmask($newset, $oldset); + $oldset; } # We can't use waitpid(-1) safely here since it can hit ``, system(), # and other things. So we scan the $wait_pids list, which is hopefully # not too big. We keep $wait_pids small by not calling dwaitpid() # until we've hit EOF when reading the stdout of the child. + sub reap_pids { - my $tmp = $wait_pids or return; - $wait_pids = $reap_timer = undef; - foreach my $ary (@$tmp) { - my ($pid, $cb, $arg) = @$ary; - my $ret = waitpid($pid, WNOHANG); - if ($ret == 0) { - push @$wait_pids, $ary; # autovivifies @$wait_pids - } elsif ($cb) { - eval { $cb->($arg, $pid) }; - } - } - # we may not be done, yet, and could've missed/masked a SIGCHLD: - $reap_timer = add_timer(1, \&reap_pids) if $wait_pids; + $reap_armed = undef; + my $tmp = $wait_pids or return; + $wait_pids = undef; + $Stack{reap_runq} = $tmp; + my $oldset = block_signals(); + foreach my $ary (@$tmp) { + my ($pid, $cb, $arg) = @$ary; + my $ret = waitpid($pid, WNOHANG); + if ($ret == 0) { + push @$wait_pids, $ary; # autovivifies @$wait_pids + } elsif ($ret == $pid) { + if ($cb) { + eval { $cb->($arg, $pid) }; + warn "E: dwaitpid($pid) in_loop: $@" if $@; + } + } else { + warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?"; + } + } + sig_setmask($oldset); + delete $Stack{reap_runq}; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) -sub enqueue_reap ($) { push @$nextq, \&reap_pids }; # autovivifies +sub enqueue_reap () { $reap_armed //= requeue(\&reap_pids) } sub in_loop () { $in_loop } @@ -269,21 +278,24 @@ sub PostEventLoop () { $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1; } -sub EpollEventLoop { +sub EventLoop { + $Epoll //= _InitPoller(); local $in_loop = 1; + my @events; do { - my @events; - my $i; my $timeout = RunTimers(); # get up to 1000 events - my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events); - for ($i=0; $i<$evcount; $i++) { + epoll_wait($Epoll, 1000, $timeout, \@events); + for my $fd (@events) { # it's possible epoll_wait returned many events, including some at the end # that ones in the front triggered unregister-interest actions. if we # can't find the %sock entry, it's because we're no longer interested # in that event. - $DescriptorMap{$events[$i]->[0]}->event_step; + + # guard stack-not-refcounted w/ Carp + @DB::args + my $obj = $DescriptorMap{$fd}; + $obj->event_step; } } while (PostEventLoop()); _run_later(); @@ -328,16 +340,16 @@ sub new { $self->{sock} = $sock; my $fd = fileno($sock); - _InitPoller(); - + $Epoll //= _InitPoller(); +retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { $ev &= ~EPOLLEXCLUSIVE; goto retry; } - die "couldn't add epoll watch for $fd: $!\n"; + die "EPOLL_CTL_ADD $self/$sock/$fd: $!"; } - confess("DescriptorMap{$fd} defined ($DescriptorMap{$fd})") + croak("FD:$fd in use by $DescriptorMap{$fd} (for $self/$sock)") if defined($DescriptorMap{$fd}); $DescriptorMap{$fd} = $self; @@ -368,7 +380,7 @@ sub close { # notifications about it my $fd = fileno($sock); epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and - confess("EPOLL_CTL_DEL: $!"); + croak("EPOLL_CTL_DEL($self/$sock): $!"); # we explicitly don't delete from DescriptorMap here until we # actually close the socket, as we might be in the middle of @@ -430,7 +442,8 @@ next_buf: goto next_buf; } } elsif ($! == EAGAIN) { - epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT); + my $ev = epbit($sock, EPOLLOUT) or return $self->close; + epwait($sock, $ev | EPOLLONESHOT); return 0; } else { return $self->close; @@ -466,7 +479,8 @@ sub do_read ($$$;$) { # common for clients to break connections without warning, # would be too noisy to log here: if ($! == EAGAIN) { - epwait($sock, epbit($sock, EPOLLIN) | EPOLLONESHOT); + my $ev = epbit($sock, EPOLLIN) or return $self->close; + epwait($sock, $ev | EPOLLONESHOT); rbuf_idle($self, $rbuf); 0; } else { @@ -540,7 +554,8 @@ sub write { return 1 if $written == $to_write; requeue($self); # runs: event_step -> flush_write } elsif ($! == EAGAIN) { - epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT); + my $ev = epbit($sock, EPOLLOUT) or return $self->close; + epwait($sock, $ev | EPOLLONESHOT); $written = 0; } else { return $self->close; @@ -584,7 +599,7 @@ sub msg_more ($$) { sub epwait ($$) { my ($sock, $ev) = @_; epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and - confess("EPOLL_CTL_MOD $!"); + croak("EPOLL_CTL_MOD($sock): $!"); } # return true if complete, false if incomplete (or failure) @@ -593,7 +608,8 @@ sub accept_tls_step ($) { my $sock = $self->{sock} or return; return 1 if $sock->accept_SSL; return $self->close if $! != EAGAIN; - epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT); + my $ev = PublicInbox::TLS::epollbit() or return $self->close; + epwait($sock, $ev | EPOLLONESHOT); unshift(@{$self->{wbuf}}, \&accept_tls_step); # autovivifies 0; } @@ -604,7 +620,8 @@ sub shutdn_tls_step ($) { my $sock = $self->{sock} or return; return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1); return $self->close if $! != EAGAIN; - epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT); + my $ev = PublicInbox::TLS::epollbit() or return $self->close; + epwait($sock, $ev | EPOLLONESHOT); unshift(@{$self->{wbuf}}, \&shutdn_tls_step); # autovivifies 0; } @@ -621,23 +638,35 @@ sub shutdn ($) { } } -# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t) -sub dwaitpid ($$$) { - die "Not in EventLoop\n" unless $in_loop; - push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ] - - # We could've just missed our SIGCHLD, cover it, here: - requeue(\&reap_pids); +sub dwaitpid ($;$$) { + my ($pid, $cb, $arg) = @_; + if ($in_loop) { + push @$wait_pids, [ $pid, $cb, $arg ]; + # We could've just missed our SIGCHLD, cover it, here: + enqueue_reap(); + } else { + my $ret = waitpid($pid, 0); + if ($ret == $pid) { + if ($cb) { + eval { $cb->($arg, $pid) }; + carp "E: dwaitpid($pid) !in_loop: $@" if $@; + } + } else { + carp "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?"; + } + } } sub _run_later () { - my $run = $later_queue or return; - $later_timer = $later_queue = undef; - $_->() for @$run; + my $q = $later_q or return; + $later_timer = $later_q = undef; + $Stack{later_q} = $q; + $_->() for @$q; + delete $Stack{later_q}; } sub later ($) { - push @$later_queue, $_[0]; # autovivifies @$later_queue + push @$later_q, $_[0]; # autovivifies @$later_q $later_timer //= add_timer(60, \&_run_later); }