X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=7a4dfed06242320f28dcb7decbfe3a76a8bcd535;hb=e901a56b3b30b22f16bc9c6460150b2b402b4ee7;hp=2d312f0ac15084fe6b295351748d481a013d8ac9;hpb=0795b0906cc81f400e0e5b9b53f812627dbd19c0;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 2d312f0a..7a4dfed0 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -25,19 +25,19 @@ use v5.10.1; use parent qw(Exporter); use bytes; use POSIX qw(WNOHANG sigprocmask SIG_SETMASK); -use IO::Handle qw(); use Fcntl qw(SEEK_SET :DEFAULT O_APPEND); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use Scalar::Util qw(blessed); use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; use Errno qw(EAGAIN EINVAL); -use Carp qw(confess carp); -our @EXPORT_OK = qw(now msg_more dwaitpid); +use Carp qw(carp croak); +our @EXPORT_OK = qw(now msg_more dwaitpid add_timer); +my %Stack; my $nextq; # queue for next_tick my $wait_pids; # list of [ pid, callback, callback_arg ] -my $later_queue; # list of callbacks to run at some later interval +my $later_q; # list of callbacks to run at some later interval my $EXPMAP; # fd -> idle_time our $EXPTIME = 180; # 3 minutes my ($later_timer, $reap_armed, $exp_timer); @@ -66,18 +66,25 @@ Reset all state =cut sub Reset { - $in_loop = undef; # first in case DESTROY callbacks use this - %DescriptorMap = (); - $wait_pids = $later_queue = $reap_armed = undef; - $EXPMAP = {}; - $nextq = $ToClose = $later_timer = $exp_timer = undef; - $LoopTimeout = -1; # no timeout by default - @Timers = (); - - $PostLoopCallback = undef; - - $_io = undef; # closes real $Epoll FD - $Epoll = undef; # may call DSKQXS::DESTROY + do { + $in_loop = undef; # first in case DESTROY callbacks use this + %DescriptorMap = (); + @Timers = (); + $PostLoopCallback = undef; + + # we may be iterating inside one of these on our stack + my @q = delete @Stack{keys %Stack}; + for my $q (@q) { @$q = () } + $EXPMAP = {}; + $wait_pids = $later_q = $nextq = $ToClose = undef; + $_io = undef; # closes real $Epoll FD + $Epoll = undef; # may call DSKQXS::DESTROY + } while (@Timers || keys(%Stack) || $nextq || $wait_pids || + $later_q || $ToClose || keys(%DescriptorMap) || + $PostLoopCallback); + + $reap_armed = $later_timer = $exp_timer = undef; + $LoopTimeout = -1; # no timeout by default } =head2 C<< CLASS->SetLoopTimeout( $timeout ) >> @@ -96,12 +103,12 @@ Add a timer to occur $seconds from now. $seconds may be fractional, but timers are not guaranteed to fire at the exact time you ask for. =cut -sub add_timer ($$;$) { - my ($secs, $coderef, $arg) = @_; +sub add_timer ($$;@) { + my ($secs, $coderef, @args) = @_; my $fire_time = now() + $secs; - my $timer = [$fire_time, $coderef, $arg]; + my $timer = [$fire_time, $coderef, @args]; if (!@Timers || $fire_time >= $Timers[-1][0]) { push @Timers, $timer; @@ -127,7 +134,7 @@ sub add_timer ($$;$) { sub set_cloexec ($) { my ($fd) = @_; - $_io = IO::Handle->new_from_fd($fd, 'r+') or return; + open($_io, '+<&=', $fd) or return; defined(my $fl = fcntl($_io, F_GETFD, 0)) or return; fcntl($_io, F_SETFD, $fl | FD_CLOEXEC); } @@ -160,17 +167,19 @@ C below for how to exit the loop. sub now () { clock_gettime(CLOCK_MONOTONIC) } sub next_tick () { - my $q = $nextq or return; - $nextq = undef; - for (@$q) { - # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak: - # https://rt.perl.org/Public/Bug/Display.html?id=114340 - if (blessed($_)) { - $_->event_step; - } else { - $_->(); - } - } + my $q = $nextq or return; + $nextq = undef; + $Stack{cur_runq} = $q; + for my $obj (@$q) { + # avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak: + # https://rt.perl.org/Public/Bug/Display.html?id=114340 + if (blessed($obj)) { + $obj->event_step; + } else { + $obj->(); + } + } + delete $Stack{cur_runq}; } # runs timers and returns milliseconds for next one, or next event loop @@ -184,7 +193,7 @@ sub RunTimers { # Run expired timers while (@Timers && $Timers[0][0] <= $now) { my $to_run = shift(@Timers); - $to_run->[1]->($to_run->[2]); + $to_run->[1]->(@$to_run[2..$#$to_run]); } # timers may enqueue into nextq: @@ -221,6 +230,7 @@ sub reap_pids { $reap_armed = undef; my $tmp = $wait_pids or return; $wait_pids = undef; + $Stack{reap_runq} = $tmp; my $oldset = block_signals(); foreach my $ary (@$tmp) { my ($pid, $cb, $arg) = @$ary; @@ -237,6 +247,7 @@ sub reap_pids { } } sig_setmask($oldset); + delete $Stack{reap_runq}; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) @@ -271,7 +282,6 @@ sub EventLoop { $Epoll //= _InitPoller(); local $in_loop = 1; my @events; - my $obj; # guard stack-not-refcounted w/ Carp + @DB::args do { my $timeout = RunTimers(); @@ -282,7 +292,9 @@ sub EventLoop { # that ones in the front triggered unregister-interest actions. if we # can't find the %sock entry, it's because we're no longer interested # in that event. - $obj = $DescriptorMap{$fd}; + + # guard stack-not-refcounted w/ Carp + @DB::args + my $obj = $DescriptorMap{$fd}; $obj->event_step; } } while (PostEventLoop()); @@ -335,9 +347,9 @@ retry: $ev &= ~EPOLLEXCLUSIVE; goto retry; } - die "couldn't add epoll watch for $fd: $!\n"; + die "EPOLL_CTL_ADD $self/$sock/$fd: $!"; } - confess("DescriptorMap{$fd} defined ($DescriptorMap{$fd})") + croak("FD:$fd in use by $DescriptorMap{$fd} (for $self/$sock)") if defined($DescriptorMap{$fd}); $DescriptorMap{$fd} = $self; @@ -368,7 +380,7 @@ sub close { # notifications about it my $fd = fileno($sock); epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and - confess("EPOLL_CTL_DEL: $!"); + croak("EPOLL_CTL_DEL($self/$sock): $!"); # we explicitly don't delete from DescriptorMap here until we # actually close the socket, as we might be in the middle of @@ -587,7 +599,7 @@ sub msg_more ($$) { sub epwait ($$) { my ($sock, $ev) = @_; epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and - confess("EPOLL_CTL_MOD $!"); + croak("EPOLL_CTL_MOD($sock): $!"); } # return true if complete, false if incomplete (or failure) @@ -646,13 +658,15 @@ sub dwaitpid ($;$$) { } sub _run_later () { - my $run = $later_queue or return; - $later_timer = $later_queue = undef; - $_->() for @$run; + my $q = $later_q or return; + $later_timer = $later_q = undef; + $Stack{later_q} = $q; + $_->() for @$q; + delete $Stack{later_q}; } sub later ($) { - push @$later_queue, $_[0]; # autovivifies @$later_queue + push @$later_q, $_[0]; # autovivifies @$later_q $later_timer //= add_timer(60, \&_run_later); }