]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
treewide: replace confess with croak
[public-inbox.git] / lib / PublicInbox / DS.pm
index a02b3bb78641bd12254e718c3344a2bfef1801de..ec965abe89d4916da8aab84d6e72cb376d969ce1 100644 (file)
 #        (tmpio = [ GLOB, offset, [ length ] ])
 package PublicInbox::DS;
 use strict;
+use v5.10.1;
+use parent qw(Exporter);
 use bytes;
-use POSIX qw(WNOHANG);
+use POSIX qw(WNOHANG sigprocmask SIG_SETMASK);
 use IO::Handle qw();
 use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
-use parent qw(Exporter);
-our @EXPORT_OK = qw(now msg_more);
-use 5.010_001;
 use Scalar::Util qw(blessed);
 use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
 use Errno qw(EAGAIN EINVAL);
-use Carp qw(confess carp);
+use Carp qw(carp);
+our @EXPORT_OK = qw(now msg_more dwaitpid);
 
 my $nextq; # queue for next_tick
 my $wait_pids; # list of [ pid, callback, callback_arg ]
@@ -50,7 +50,6 @@ our (
      $PostLoopCallback,          # subref to call at the end of each loop, if defined (global)
 
      $LoopTimeout,               # timeout of event loop in milliseconds
-     $DoneInit,                  # if we've done the one-time module init yet
      @Timers,                    # timers
      $in_loop,
      );
@@ -67,20 +66,18 @@ Reset all state
 
 =cut
 sub Reset {
+    $in_loop = undef; # first in case DESTROY callbacks use this
     %DescriptorMap = ();
-    $in_loop = $wait_pids = $later_queue = $reap_armed = undef;
+    $wait_pids = $later_queue = $reap_armed = undef;
     $EXPMAP = {};
     $nextq = $ToClose = $later_timer = $exp_timer = undef;
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
 
     $PostLoopCallback = undef;
-    $DoneInit = 0;
 
     $_io = undef; # closes real $Epoll FD
     $Epoll = undef; # may call DSKQXS::DESTROY
-
-    *EventLoop = *FirstTimeEventLoop;
 }
 
 =head2 C<< CLASS->SetLoopTimeout( $timeout ) >>
@@ -91,9 +88,7 @@ A timeout of 0 (zero) means poll forever. A timeout of -1 means poll and return
 immediately.
 
 =cut
-sub SetLoopTimeout {
-    return $LoopTimeout = $_[1] + 0;
-}
+sub SetLoopTimeout { $LoopTimeout = $_[1] + 0 }
 
 =head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >>
 
@@ -137,14 +132,13 @@ sub set_cloexec ($) {
     fcntl($_io, F_SETFD, $fl | FD_CLOEXEC);
 }
 
+# caller sets return value to $Epoll
 sub _InitPoller
 {
-    return if $DoneInit;
-    $DoneInit = 1;
-
     if (PublicInbox::Syscall::epoll_defined())  {
-        $Epoll = epoll_create();
-        set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0);
+        my $fd = epoll_create();
+        set_cloexec($fd) if (defined($fd) && $fd >= 0);
+       $fd;
     } else {
         my $cls;
         for (qw(DSKQXS DSPoll)) {
@@ -152,9 +146,8 @@ sub _InitPoller
             last if eval "require $cls";
         }
         $cls->import(qw(epoll_ctl epoll_wait));
-        $Epoll = $cls->new;
+        $cls->new;
     }
-    *EventLoop = *EpollEventLoop;
 }
 
 =head2 C<< CLASS->EventLoop() >>
@@ -163,26 +156,19 @@ Start processing IO events. In most daemon programs this never exits. See
 C<PostLoopCallback> below for how to exit the loop.
 
 =cut
-sub FirstTimeEventLoop {
-    my $class = shift;
-
-    _InitPoller();
-
-    EventLoop($class);
-}
 
 sub now () { clock_gettime(CLOCK_MONOTONIC) }
 
 sub next_tick () {
     my $q = $nextq or return;
     $nextq = undef;
-    for (@$q) {
+    for my $obj (@$q) {
         # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak:
         # https://rt.perl.org/Public/Bug/Display.html?id=114340
-        if (blessed($_)) {
-            $_->event_step;
+        if (blessed($obj)) {
+            $obj->event_step;
         } else {
-            $_->();
+            $obj->();
         }
     }
 }
@@ -213,12 +199,17 @@ sub RunTimers {
     my $timeout = int(($Timers[0][0] - $now) * 1000) + 1;
 
     # -1 is an infinite timeout, so prefer a real timeout
-    return $timeout     if $LoopTimeout == -1;
+    ($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout;
+}
+
+sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
 
-    # otherwise pick the lower of our regular timeout and time until
-    # the next timer
-    return $LoopTimeout if $LoopTimeout < $timeout;
-    return $timeout;
+sub block_signals () {
+       my $oldset = POSIX::SigSet->new;
+       my $newset = POSIX::SigSet->new;
+       $newset->fillset or die "fillset: $!";
+       sig_setmask($newset, $oldset);
+       $oldset;
 }
 
 # We can't use waitpid(-1) safely here since it can hit ``, system(),
@@ -230,17 +221,22 @@ sub reap_pids {
        $reap_armed = undef;
        my $tmp = $wait_pids or return;
        $wait_pids = undef;
+       my $oldset = block_signals();
        foreach my $ary (@$tmp) {
                my ($pid, $cb, $arg) = @$ary;
                my $ret = waitpid($pid, WNOHANG);
                if ($ret == 0) {
                        push @$wait_pids, $ary; # autovivifies @$wait_pids
-               } elsif ($cb) {
-                       eval { $cb->($arg, $pid) };
+               } elsif ($ret == $pid) {
+                       if ($cb) {
+                               eval { $cb->($arg, $pid) };
+                               warn "E: dwaitpid($pid) in_loop: $@" if $@;
+                       }
+               } else {
+                       warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
                }
        }
-       # we may not be done, yet, and could've missed/masked a SIGCHLD:
-       $reap_armed //= requeue(\&reap_pids) if $wait_pids;
+       sig_setmask($oldset);
 }
 
 # reentrant SIGCHLD handler (since reap_pids is not reentrant)
@@ -271,21 +267,23 @@ sub PostEventLoop () {
        $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1;
 }
 
-sub EpollEventLoop {
+sub EventLoop {
+    $Epoll //= _InitPoller();
     local $in_loop = 1;
+    my @events;
+    my $obj; # guard stack-not-refcounted w/ Carp + @DB::args
     do {
-        my @events;
-        my $i;
         my $timeout = RunTimers();
 
         # get up to 1000 events
-        my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events);
-        for ($i=0; $i<$evcount; $i++) {
+        epoll_wait($Epoll, 1000, $timeout, \@events);
+        for my $fd (@events) {
             # it's possible epoll_wait returned many events, including some at the end
             # that ones in the front triggered unregister-interest actions.  if we
             # can't find the %sock entry, it's because we're no longer interested
             # in that event.
-            $DescriptorMap{$events[$i]->[0]}->event_step;
+            $obj = $DescriptorMap{$fd};
+            $obj->event_step;
         }
     } while (PostEventLoop());
     _run_later();
@@ -330,17 +328,16 @@ sub new {
     $self->{sock} = $sock;
     my $fd = fileno($sock);
 
-    _InitPoller();
-
+    $Epoll //= _InitPoller();
 retry:
     if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
         if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
             $ev &= ~EPOLLEXCLUSIVE;
             goto retry;
         }
-        die "couldn't add epoll watch for $fd: $!\n";
+        die "EPOLL_CTL_ADD $self/$sock/$fd: $!";
     }
-    confess("DescriptorMap{$fd} defined ($DescriptorMap{$fd})")
+    croak("FD:$fd in use by $DescriptorMap{$fd} (for $self/$sock)")
         if defined($DescriptorMap{$fd});
 
     $DescriptorMap{$fd} = $self;
@@ -371,7 +368,7 @@ sub close {
     # notifications about it
     my $fd = fileno($sock);
     epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and
-        confess("EPOLL_CTL_DEL: $!");
+        croak("EPOLL_CTL_DEL($self/$sock): $!");
 
     # we explicitly don't delete from DescriptorMap here until we
     # actually close the socket, as we might be in the middle of
@@ -590,7 +587,7 @@ sub msg_more ($$) {
 sub epwait ($$) {
     my ($sock, $ev) = @_;
     epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
-        confess("EPOLL_CTL_MOD $!");
+        croak("EPOLL_CTL_MOD($sock): $!");
 }
 
 # return true if complete, false if incomplete (or failure)
@@ -629,13 +626,23 @@ sub shutdn ($) {
     }
 }
 
-# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
-sub dwaitpid ($$$) {
-       die "Not in EventLoop\n" unless $in_loop;
-       push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ]
-
-       # We could've just missed our SIGCHLD, cover it, here:
-       enqueue_reap();
+sub dwaitpid ($;$$) {
+       my ($pid, $cb, $arg) = @_;
+       if ($in_loop) {
+               push @$wait_pids, [ $pid, $cb, $arg ];
+               # We could've just missed our SIGCHLD, cover it, here:
+               enqueue_reap();
+       } else {
+               my $ret = waitpid($pid, 0);
+               if ($ret == $pid) {
+                       if ($cb) {
+                               eval { $cb->($arg, $pid) };
+                               carp "E: dwaitpid($pid) !in_loop: $@" if $@;
+                       }
+               } else {
+                       carp "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?";
+               }
+       }
 }
 
 sub _run_later () {