]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/DS.pm
config: support "inboxdir" in addition to "mainrepo"
[public-inbox.git] / lib / PublicInbox / DS.pm
index 57c420634ca42529f8b7c01a29d459723ad5933c..7f7cb85d8583cd69c509f7a7ca2280dcd01e156e 100644 (file)
@@ -16,7 +16,7 @@
 package PublicInbox::DS;
 use strict;
 use bytes;
-use POSIX ();
+use POSIX qw(WNOHANG);
 use IO::Handle qw();
 use Fcntl qw(SEEK_SET :DEFAULT);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
@@ -24,8 +24,10 @@ use parent qw(Exporter);
 our @EXPORT_OK = qw(now msg_more);
 use warnings;
 use 5.010_001;
+use Scalar::Util qw(blessed);
 
 use PublicInbox::Syscall qw(:epoll);
+use PublicInbox::Tmpfile;
 
 use fields ('sock',              # underlying socket
             'rbuf',              # scalarref, usually undef
@@ -33,11 +35,13 @@ use fields ('sock',              # underlying socket
             'wbuf_off',  # offset into first element of wbuf to start writing at
             );
 
-use Errno  qw(EAGAIN EINVAL EEXIST);
+use Errno  qw(EAGAIN EINVAL);
 use Carp   qw(croak confess carp);
 require File::Spec;
 
 my $nextq = []; # queue for next_tick
+my $WaitPids = [];               # list of [ pid, callback, callback_arg ]
+my $reap_timer;
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
@@ -64,6 +68,8 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
+    $WaitPids = [];
+    $reap_timer = undef;
     @ToClose = ();
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
@@ -173,10 +179,12 @@ sub next_tick () {
     my $q = $nextq;
     $nextq = [];
     for (@$q) {
-        if (ref($_) eq 'CODE') {
-            $_->();
-        } else {
+        # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak:
+        # https://rt.perl.org/Public/Bug/Display.html?id=114340
+        if (blessed($_)) {
             $_->event_step;
+        } else {
+            $_->();
         }
     }
 }
@@ -215,7 +223,33 @@ sub RunTimers {
     return $timeout;
 }
 
+# We can't use waitpid(-1) safely here since it can hit ``, system(),
+# and other things.  So we scan the $WaitPids list, which is hopefully
+# not too big.
+sub reap_pids {
+    my $tmp = $WaitPids;
+    $WaitPids = [];
+    $reap_timer = undef;
+    foreach my $ary (@$tmp) {
+        my ($pid, $cb, $arg) = @$ary;
+        my $ret = waitpid($pid, WNOHANG);
+        if ($ret == 0) {
+            push @$WaitPids, $ary;
+        } elsif ($cb) {
+            eval { $cb->($arg, $pid) };
+        }
+    }
+    if (@$WaitPids) {
+        # we may not be donea, and we may miss our
+        $reap_timer = AddTimer(undef, 1, \&reap_pids);
+    }
+}
+
+# reentrant SIGCHLD handler (since reap_pids is not reentrant)
+sub enqueue_reap ($) { push @$nextq, \&reap_pids };
+
 sub EpollEventLoop {
+    local $SIG{CHLD} = \&enqueue_reap;
     while (1) {
         my @events;
         my $i;
@@ -440,6 +474,7 @@ sub do_read ($$$;$) {
     if ($! == EAGAIN) {
         epwait($sock, epbit($sock, EPOLLIN) | EPOLLONESHOT);
         rbuf_idle($self, $rbuf);
+        0;
     } else {
         $self->close;
     }
@@ -457,15 +492,8 @@ sub drop {
 # PerlIO::mmap or PerlIO::scalar if needed
 sub tmpio ($$$) {
     my ($self, $bref, $off) = @_;
-    my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND
-    do {
-        my $fn = File::Spec->tmpdir . '/wbuf-' . rand;
-        if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely
-            unlink($fn) or return drop($self, "unlink($fn) $!");
-        } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM
-            return drop($self, "open: $!");
-        }
-    } until (defined $fh);
+    my $fh = tmpfile('wbuf', $self->{sock}, 1) or
+        return drop($self, "tmpfile $!");
     $fh->autoflush(1);
     my $len = bytes::length($$bref) - $off;
     $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
@@ -552,14 +580,15 @@ sub msg_more ($$) {
             return 0;
         }
     }
-    $self->write(\($_[1]));
+
+    # don't redispatch into NNTPdeflate::write
+    PublicInbox::DS::write($self, \($_[1]));
 }
 
 sub epwait ($$) {
     my ($sock, $ev) = @_;
     epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
         confess("EPOLL_CTL_MOD $!");
-    0;
 }
 
 # return true if complete, false if incomplete (or failure)
@@ -570,8 +599,10 @@ sub accept_tls_step ($) {
     return $self->close if $! != EAGAIN;
     epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
     unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+    0;
 }
 
+# return true if complete, false if incomplete (or failure)
 sub shutdn_tls_step ($) {
     my ($self) = @_;
     my $sock = $self->{sock} or return;
@@ -579,6 +610,7 @@ sub shutdn_tls_step ($) {
     return $self->close if $! != EAGAIN;
     epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
     unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+    0;
 }
 
 # don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
@@ -592,6 +624,21 @@ sub shutdn ($) {
        $self->close;
     }
 }
+
+# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
+sub dwaitpid ($$$) {
+    my ($pid, $cb, $arg) = @_;
+    my $chld = $SIG{CHLD};
+    if (defined($chld) && $chld eq \&enqueue_reap) {
+        push @$WaitPids, [ $pid, $cb, $arg ];
+
+        # We could've just missed our SIGCHLD, cover it, here:
+        requeue(\&reap_pids);
+    } else {
+        die "Not in EventLoop\n";
+    }
+}
+
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {