]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
treewide: favor open(..., '+<&=', $fd)
[public-inbox.git] / lib / PublicInbox / DS.pm
index 2d312f0ac15084fe6b295351748d481a013d8ac9..7a4dfed06242320f28dcb7decbfe3a76a8bcd535 100644 (file)
@@ -25,19 +25,19 @@ use v5.10.1;
 use parent qw(Exporter);
 use bytes;
 use POSIX qw(WNOHANG sigprocmask SIG_SETMASK);
-use IO::Handle qw();
 use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
 use Scalar::Util qw(blessed);
 use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
 use Errno qw(EAGAIN EINVAL);
-use Carp qw(confess carp);
-our @EXPORT_OK = qw(now msg_more dwaitpid);
+use Carp qw(carp croak);
+our @EXPORT_OK = qw(now msg_more dwaitpid add_timer);
 
+my %Stack;
 my $nextq; # queue for next_tick
 my $wait_pids; # list of [ pid, callback, callback_arg ]
-my $later_queue; # list of callbacks to run at some later interval
+my $later_q; # list of callbacks to run at some later interval
 my $EXPMAP; # fd -> idle_time
 our $EXPTIME = 180; # 3 minutes
 my ($later_timer, $reap_armed, $exp_timer);
@@ -66,18 +66,25 @@ Reset all state
 
 =cut
 sub Reset {
-    $in_loop = undef; # first in case DESTROY callbacks use this
-    %DescriptorMap = ();
-    $wait_pids = $later_queue = $reap_armed = undef;
-    $EXPMAP = {};
-    $nextq = $ToClose = $later_timer = $exp_timer = undef;
-    $LoopTimeout = -1;  # no timeout by default
-    @Timers = ();
-
-    $PostLoopCallback = undef;
-
-    $_io = undef; # closes real $Epoll FD
-    $Epoll = undef; # may call DSKQXS::DESTROY
+       do {
+               $in_loop = undef; # first in case DESTROY callbacks use this
+               %DescriptorMap = ();
+               @Timers = ();
+               $PostLoopCallback = undef;
+
+               # we may be iterating inside one of these on our stack
+               my @q = delete @Stack{keys %Stack};
+               for my $q (@q) { @$q = () }
+               $EXPMAP = {};
+               $wait_pids = $later_q = $nextq = $ToClose = undef;
+               $_io = undef; # closes real $Epoll FD
+               $Epoll = undef; # may call DSKQXS::DESTROY
+       } while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
+               $later_q || $ToClose || keys(%DescriptorMap) ||
+               $PostLoopCallback);
+
+       $reap_armed = $later_timer = $exp_timer = undef;
+       $LoopTimeout = -1;  # no timeout by default
 }
 
 =head2 C<< CLASS->SetLoopTimeout( $timeout ) >>
@@ -96,12 +103,12 @@ Add a timer to occur $seconds from now. $seconds may be fractional, but timers
 are not guaranteed to fire at the exact time you ask for.
 
 =cut
-sub add_timer ($$;$) {
-    my ($secs, $coderef, $arg) = @_;
+sub add_timer ($$;@) {
+    my ($secs, $coderef, @args) = @_;
 
     my $fire_time = now() + $secs;
 
-    my $timer = [$fire_time, $coderef, $arg];
+    my $timer = [$fire_time, $coderef, @args];
 
     if (!@Timers || $fire_time >= $Timers[-1][0]) {
         push @Timers, $timer;
@@ -127,7 +134,7 @@ sub add_timer ($$;$) {
 sub set_cloexec ($) {
     my ($fd) = @_;
 
-    $_io = IO::Handle->new_from_fd($fd, 'r+') or return;
+    open($_io, '+<&=', $fd) or return;
     defined(my $fl = fcntl($_io, F_GETFD, 0)) or return;
     fcntl($_io, F_SETFD, $fl | FD_CLOEXEC);
 }
@@ -160,17 +167,19 @@ C<PostLoopCallback> below for how to exit the loop.
 sub now () { clock_gettime(CLOCK_MONOTONIC) }
 
 sub next_tick () {
-    my $q = $nextq or return;
-    $nextq = undef;
-    for (@$q) {
-        # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak:
-        # https://rt.perl.org/Public/Bug/Display.html?id=114340
-        if (blessed($_)) {
-            $_->event_step;
-        } else {
-            $_->();
-        }
-    }
+       my $q = $nextq or return;
+       $nextq = undef;
+       $Stack{cur_runq} = $q;
+       for my $obj (@$q) {
+               # avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak:
+               # https://rt.perl.org/Public/Bug/Display.html?id=114340
+               if (blessed($obj)) {
+                       $obj->event_step;
+               } else {
+                       $obj->();
+               }
+       }
+       delete $Stack{cur_runq};
 }
 
 # runs timers and returns milliseconds for next one, or next event loop
@@ -184,7 +193,7 @@ sub RunTimers {
     # Run expired timers
     while (@Timers && $Timers[0][0] <= $now) {
         my $to_run = shift(@Timers);
-        $to_run->[1]->($to_run->[2]);
+        $to_run->[1]->(@$to_run[2..$#$to_run]);
     }
 
     # timers may enqueue into nextq:
@@ -221,6 +230,7 @@ sub reap_pids {
        $reap_armed = undef;
        my $tmp = $wait_pids or return;
        $wait_pids = undef;
+       $Stack{reap_runq} = $tmp;
        my $oldset = block_signals();
        foreach my $ary (@$tmp) {
                my ($pid, $cb, $arg) = @$ary;
@@ -237,6 +247,7 @@ sub reap_pids {
                }
        }
        sig_setmask($oldset);
+       delete $Stack{reap_runq};
 }
 
 # reentrant SIGCHLD handler (since reap_pids is not reentrant)
@@ -271,7 +282,6 @@ sub EventLoop {
     $Epoll //= _InitPoller();
     local $in_loop = 1;
     my @events;
-    my $obj; # guard stack-not-refcounted w/ Carp + @DB::args
     do {
         my $timeout = RunTimers();
 
@@ -282,7 +292,9 @@ sub EventLoop {
             # that ones in the front triggered unregister-interest actions.  if we
             # can't find the %sock entry, it's because we're no longer interested
             # in that event.
-            $obj = $DescriptorMap{$fd};
+
+           # guard stack-not-refcounted w/ Carp + @DB::args
+            my $obj = $DescriptorMap{$fd};
             $obj->event_step;
         }
     } while (PostEventLoop());
@@ -335,9 +347,9 @@ retry:
             $ev &= ~EPOLLEXCLUSIVE;
             goto retry;
         }
-        die "couldn't add epoll watch for $fd: $!\n";
+        die "EPOLL_CTL_ADD $self/$sock/$fd: $!";
     }
-    confess("DescriptorMap{$fd} defined ($DescriptorMap{$fd})")
+    croak("FD:$fd in use by $DescriptorMap{$fd} (for $self/$sock)")
         if defined($DescriptorMap{$fd});
 
     $DescriptorMap{$fd} = $self;
@@ -368,7 +380,7 @@ sub close {
     # notifications about it
     my $fd = fileno($sock);
     epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and
-        confess("EPOLL_CTL_DEL: $!");
+        croak("EPOLL_CTL_DEL($self/$sock): $!");
 
     # we explicitly don't delete from DescriptorMap here until we
     # actually close the socket, as we might be in the middle of
@@ -587,7 +599,7 @@ sub msg_more ($$) {
 sub epwait ($$) {
     my ($sock, $ev) = @_;
     epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
-        confess("EPOLL_CTL_MOD $!");
+        croak("EPOLL_CTL_MOD($sock): $!");
 }
 
 # return true if complete, false if incomplete (or failure)
@@ -646,13 +658,15 @@ sub dwaitpid ($;$$) {
 }
 
 sub _run_later () {
-       my $run = $later_queue or return;
-       $later_timer = $later_queue = undef;
-       $_->() for @$run;
+       my $q = $later_q or return;
+       $later_timer = $later_q = undef;
+       $Stack{later_q} = $q;
+       $_->() for @$q;
+       delete $Stack{later_q};
 }
 
 sub later ($) {
-       push @$later_queue, $_[0]; # autovivifies @$later_queue
+       push @$later_q, $_[0]; # autovivifies @$later_q
        $later_timer //= add_timer(60, \&_run_later);
 }