]> Sergey Matveev's repositories - public-inbox.git/commitdiff
Merge remote-tracking branch 'origin/nntp-tls'
authorEric Wong <e@80x24.org>
Wed, 26 Jun 2019 06:36:27 +0000 (06:36 +0000)
committerEric Wong <e@80x24.org>
Wed, 26 Jun 2019 06:36:27 +0000 (06:36 +0000)
* origin/nntp-tls: (59 commits)
  ds: ->write must not clobber empty wbuf array
  Makefile: skip DSKQXS in global syntax check
  ds: reduce overhead of tempfile creation
  Revert "ci: require IO::KQueue on FreeBSD, for now"
  ds: reimplement IO::Poll support to look like epoll
  ds: split out IO::KQueue-specific code
  daemon: use FreeBSD accept filters on non-NNTP
  daemon: set TCP_DEFER_ACCEPT on everything but NNTP
  nntp: send greeting immediately for plain sockets
  ci: require IO::KQueue on FreeBSD, for now
  nntp: lazily allocate and stash rbuf
  ds: flush_write runs ->write callbacks even if closed
  nntp: simplify long response logic and fix nesting
  ds: always use EV_ADD with EV_SET
  nntp: reduce allocations for greeting
  ds: allow ->write callbacks to syswrite directly
  daemon: use SSL_MODE_RELEASE_BUFFERS
  t/nntpd-tls: slow client connection test
  nntp: call SSL_shutdown in normal cases
  ds|nntp: use CORE::close on socket
  ...

27 files changed:
MANIFEST
Makefile.PL
certs/.gitignore [new file with mode: 0644]
certs/create-certs.perl [new file with mode: 0755]
lib/PublicInbox/DS.pm
lib/PublicInbox/DSKQXS.pm [new file with mode: 0644]
lib/PublicInbox/DSPoll.pm [new file with mode: 0644]
lib/PublicInbox/Daemon.pm
lib/PublicInbox/EvCleanup.pm
lib/PublicInbox/GitHTTPBackend.pm
lib/PublicInbox/HTTP.pm
lib/PublicInbox/HTTPD/Async.pm
lib/PublicInbox/Listener.pm
lib/PublicInbox/NNTP.pm
lib/PublicInbox/NNTPD.pm
lib/PublicInbox/ParentPipe.pm
lib/PublicInbox/Qspawn.pm
lib/PublicInbox/Spawn.pm
lib/PublicInbox/Syscall.pm
lib/PublicInbox/TLS.pm [new file with mode: 0644]
script/public-inbox-nntpd
t/ds-poll.t [new file with mode: 0644]
t/httpd-corner.t
t/httpd.t
t/nntpd-tls.t [new file with mode: 0644]
t/nntpd.t
t/spawn.t

index c76939767544da067e55410800f97abc7afffa7d..29920953e962b30124c87f24f9f2bef973906761 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -31,6 +31,8 @@ MANIFEST
 Makefile.PL
 README
 TODO
+certs/.gitignore
+certs/create-certs.perl
 ci/README
 ci/deps.perl
 ci/profiles.sh
@@ -75,6 +77,8 @@ lib/PublicInbox/Cgit.pm
 lib/PublicInbox/Config.pm
 lib/PublicInbox/ContentId.pm
 lib/PublicInbox/DS.pm
+lib/PublicInbox/DSKQXS.pm
+lib/PublicInbox/DSPoll.pm
 lib/PublicInbox/Daemon.pm
 lib/PublicInbox/Emergency.pm
 lib/PublicInbox/EvCleanup.pm
@@ -129,6 +133,7 @@ lib/PublicInbox/Spamcheck/Spamc.pm
 lib/PublicInbox/Spawn.pm
 lib/PublicInbox/SpawnPP.pm
 lib/PublicInbox/Syscall.pm
+lib/PublicInbox/TLS.pm
 lib/PublicInbox/Unsubscribe.pm
 lib/PublicInbox/UserContent.pm
 lib/PublicInbox/V2Writable.pm
@@ -187,6 +192,7 @@ t/content_id.t
 t/convert-compact.t
 t/data/0001.patch
 t/ds-leak.t
+t/ds-poll.t
 t/edit.t
 t/emergency.t
 t/fail-bin/spamc
@@ -222,6 +228,7 @@ t/msg_iter.t
 t/msgmap.t
 t/msgtime.t
 t/nntp.t
+t/nntpd-tls.t
 t/nntpd.t
 t/nulsubject.t
 t/over.t
index 238220726cab993dc6b0d5e61ae17ddebf018247..adcf91e55ea50021610e302be45f0fbaf478ba21 100644 (file)
@@ -77,7 +77,7 @@ changed = \$(shell git ls-files -m)
 %.syntax ::
        @\$(PERL) -w -I lib -c \$(subst .syntax,,\$@)
 
-syntax:: \$(my_syntax)
+syntax:: \$(filter-out lib/PublicInbox/DSKQXS.pm.syntax,\$(my_syntax))
 
 dsyn :: \$(addsuffix .syntax, \$(filter \$(changed), \$(syn_files)))
 
diff --git a/certs/.gitignore b/certs/.gitignore
new file mode 100644 (file)
index 0000000..0b3a547
--- /dev/null
@@ -0,0 +1,4 @@
+*.pem
+*.der
+*.enc
+*.p12
diff --git a/certs/create-certs.perl b/certs/create-certs.perl
new file mode 100755 (executable)
index 0000000..476be4d
--- /dev/null
@@ -0,0 +1,132 @@
+#!/usr/bin/perl -w
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# from IO::Socket::SSL 2.063 / https://github.com/noxxi/p5-io-socket-ssl
+use strict;
+use warnings;
+use IO::Socket::SSL::Utils;
+use Net::SSLeay;
+
+my $dir = "./";
+my $now = time();
+my $later = 0x7fffffff; # 2038 problems on 32-bit :<
+
+Net::SSLeay::SSLeay_add_ssl_algorithms();
+my $sha256 = Net::SSLeay::EVP_get_digestbyname('sha256') or die;
+my $printfp = sub {
+    my ($w,$cert) = @_;
+    print $w.' sha256$'.unpack('H*',Net::SSLeay::X509_digest($cert, $sha256))."\n"
+};
+
+my %time_valid = (not_before => $now, not_after => $later);
+
+my @ca = CERT_create(
+    CA => 1,
+    subject => { CN => 'IO::Socket::SSL Demo CA' },
+    %time_valid,
+);
+save('test-ca.pem',PEM_cert2string($ca[0]));
+
+my @server = CERT_create(
+    CA => 0,
+    subject => { CN => 'server.local' },
+    purpose => 'server',
+    issuer => \@ca,
+    %time_valid,
+);
+save('server-cert.pem',PEM_cert2string($server[0]));
+save('server-key.pem',PEM_key2string($server[1]));
+$printfp->(server => $server[0]);
+
+@server = CERT_create(
+    CA => 0,
+    subject => { CN => 'server2.local' },
+    purpose => 'server',
+    issuer => \@ca,
+    %time_valid,
+);
+save('server2-cert.pem',PEM_cert2string($server[0]));
+save('server2-key.pem',PEM_key2string($server[1]));
+$printfp->(server2 => $server[0]);
+
+@server = CERT_create(
+    CA => 0,
+    subject => { CN => 'server-ecc.local' },
+    purpose => 'server',
+    issuer => \@ca,
+    key => KEY_create_ec(),
+    %time_valid,
+);
+save('server-ecc-cert.pem',PEM_cert2string($server[0]));
+save('server-ecc-key.pem',PEM_key2string($server[1]));
+$printfp->('server-ecc' => $server[0]);
+
+
+my @client = CERT_create(
+    CA => 0,
+    subject => { CN => 'client.local' },
+    purpose => 'client',
+    issuer => \@ca,
+    %time_valid,
+);
+save('client-cert.pem',PEM_cert2string($client[0]));
+save('client-key.pem',PEM_key2string($client[1]));
+$printfp->(client => $client[0]);
+
+my @swc = CERT_create(
+    CA => 0,
+    subject => { CN => 'server.local' },
+    purpose => 'server',
+    issuer => \@ca,
+    subjectAltNames => [
+       [ DNS => '*.server.local' ],
+       [ IP => '127.0.0.1' ],
+       [ DNS => 'www*.other.local' ],
+       [ DNS => 'smtp.mydomain.local' ],
+       [ DNS => 'xn--lwe-sna.idntest.local' ]
+    ],
+    %time_valid,
+);
+save('server-wildcard.pem',PEM_cert2string($swc[0]),PEM_key2string($swc[1]));
+
+
+my @subca = CERT_create(
+    CA => 1,
+    issuer => \@ca,
+    subject => { CN => 'IO::Socket::SSL Demo Sub CA' },
+    %time_valid,
+);
+save('test-subca.pem',PEM_cert2string($subca[0]));
+@server = CERT_create(
+    CA => 0,
+    subject => { CN => 'server.local' },
+    purpose => 'server',
+    issuer => \@subca,
+    %time_valid,
+);
+save('sub-server.pem',PEM_cert2string($server[0]).PEM_key2string($server[1]));
+
+
+
+my @cap = CERT_create(
+    CA => 1,
+    subject => { CN => 'IO::Socket::SSL::Intercept' },
+    %time_valid,
+);
+save('proxyca.pem',PEM_cert2string($cap[0]).PEM_key2string($cap[1]));
+
+sub save {
+    my $file = shift;
+    open(my $fd,'>',$dir.$file) or die $!;
+    print $fd @_;
+}
+
+system(<<CMD);
+cd $dir
+set -x
+openssl x509 -in server-cert.pem -out server-cert.der -outform der
+openssl rsa -in server-key.pem -out server-key.der -outform der
+openssl rsa -in server-key.pem -out server-key.enc -passout pass:bluebell
+openssl rsa -in client-key.pem -out client-key.enc -passout pass:opossum
+openssl pkcs12 -export -in server-cert.pem -inkey server-key.pem -out server.p12 -passout pass:
+openssl pkcs12 -export -in server-cert.pem -inkey server-key.pem -out server_enc.p12 -passout pass:bluebell
+CMD
index 2b04886ab50e171c57187ee80ec01977e207c497..08f4e9e8a3389937d9dde1fad0d1e50d687e4202 100644 (file)
@@ -17,40 +17,28 @@ package PublicInbox::DS;
 use strict;
 use bytes;
 use POSIX ();
-use Time::HiRes ();
 use IO::Handle qw();
-use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
-
+use Fcntl qw(SEEK_SET :DEFAULT);
+use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+use parent qw(Exporter);
+our @EXPORT_OK = qw(now msg_more);
 use warnings;
+use 5.010_001;
 
 use PublicInbox::Syscall qw(:epoll);
 
 use fields ('sock',              # underlying socket
-            'wbuf',              # arrayref of scalars, scalarrefs, or coderefs to write
+            'wbuf',              # arrayref of coderefs or GLOB refs
             'wbuf_off',  # offset into first element of wbuf to start writing at
-            'closed',            # bool: socket is closed
-            'event_watch',       # bitmask of events the client is interested in (POLLIN,OUT,etc.)
             );
 
-use Errno  qw(EAGAIN EINVAL);
-use Carp   qw(croak confess);
-
-use constant DebugLevel => 0;
-
-use constant POLLIN        => 1;
-use constant POLLOUT       => 4;
-use constant POLLERR       => 8;
-use constant POLLHUP       => 16;
-use constant POLLNVAL      => 32;
-
-our $HAVE_KQUEUE = eval { require IO::KQueue; 1 };
+use Errno  qw(EAGAIN EINVAL EEXIST);
+use Carp   qw(croak confess carp);
+require File::Spec;
 
 our (
-     $HaveEpoll,                 # Flag -- is epoll available?  initially undefined.
-     $HaveKQueue,
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
-     $Epoll,                     # Global epoll fd (for epoll mode only)
-     $KQueue,                    # Global kqueue fd ref (for kqueue mode only)
+     $Epoll,                     # Global epoll fd (or DSKQXS ref)
      $_io,                       # IO::Handle for Epoll
      @ToClose,                   # sockets to close when event loop is done
 
@@ -61,8 +49,6 @@ our (
      @Timers,                    # timers
      );
 
-# this may be set to zero with old kernels
-our $EPOLLEXCLUSIVE = EPOLLEXCLUSIVE;
 Reset();
 
 #####################################################################
@@ -83,13 +69,8 @@ sub Reset {
     $PostLoopCallback = undef;
     $DoneInit = 0;
 
-    # NOTE kqueue is close-on-fork, and we don't account for it, yet
-    # OTOH, we (public-inbox) don't need this sub outside of tests...
-    POSIX::close($$KQueue) if !$_io && $KQueue && $$KQueue >= 0;
-    $KQueue = undef;
-
-    $_io = undef; # close $Epoll
-    $Epoll = undef;
+    $_io = undef; # closes real $Epoll FD
+    $Epoll = undef; # may call DSKQXS::DESTROY
 
     *EventLoop = *FirstTimeEventLoop;
 }
@@ -106,18 +87,6 @@ sub SetLoopTimeout {
     return $LoopTimeout = $_[1] + 0;
 }
 
-=head2 C<< CLASS->DebugMsg( $format, @args ) >>
-
-Print the debugging message specified by the C<sprintf>-style I<format> and
-I<args>
-
-=cut
-sub DebugMsg {
-    my ( $class, $fmt, @args ) = @_;
-    chomp $fmt;
-    printf STDERR ">>> $fmt\n", @args;
-}
-
 =head2 C<< CLASS->AddTimer( $seconds, $coderef ) >>
 
 Add a timer to occur $seconds from now. $seconds may be fractional, but timers
@@ -127,10 +96,15 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t
 
 =cut
 sub AddTimer {
-    my $class = shift;
-    my ($secs, $coderef) = @_;
+    my ($class, $secs, $coderef) = @_;
 
-    my $fire_time = Time::HiRes::time() + $secs;
+    if (!$secs) {
+        my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
+        unshift(@Timers, $timer);
+        return $timer;
+    }
+
+    my $fire_time = now() + $secs;
 
     my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
 
@@ -168,26 +142,19 @@ sub _InitPoller
     return if $DoneInit;
     $DoneInit = 1;
 
-    if ($HAVE_KQUEUE) {
-        $KQueue = IO::KQueue->new();
-        $HaveKQueue = defined $KQueue;
-        if ($HaveKQueue) {
-            *EventLoop = *KQueueEventLoop;
-        }
-    }
-    elsif (PublicInbox::Syscall::epoll_defined()) {
-        $Epoll = eval { epoll_create(1024); };
-        $HaveEpoll = defined $Epoll && $Epoll >= 0;
-        if ($HaveEpoll) {
-            set_cloexec($Epoll);
-            *EventLoop = *EpollEventLoop;
+    if (PublicInbox::Syscall::epoll_defined())  {
+        $Epoll = epoll_create();
+        set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0);
+    } else {
+        my $cls;
+        for (qw(DSKQXS DSPoll)) {
+            $cls = "PublicInbox::$_";
+            last if eval "require $cls";
         }
+        $cls->import;
+        $Epoll = $cls->new;
     }
-
-    if (!$HaveEpoll && !$HaveKQueue) {
-        require IO::Poll;
-        *EventLoop = *PollEventLoop;
-    }
+    *EventLoop = *EpollEventLoop;
 }
 
 =head2 C<< CLASS->EventLoop() >>
@@ -201,20 +168,16 @@ sub FirstTimeEventLoop {
 
     _InitPoller();
 
-    if ($HaveEpoll) {
-        EpollEventLoop($class);
-    } elsif ($HaveKQueue) {
-        KQueueEventLoop($class);
-    } else {
-        PollEventLoop($class);
-    }
+    EventLoop($class);
 }
 
+sub now () { clock_gettime(CLOCK_MONOTONIC) }
+
 # runs timers and returns milliseconds for next one, or next event loop
 sub RunTimers {
     return $LoopTimeout unless @Timers;
 
-    my $now = Time::HiRes::time();
+    my $now = now();
 
     # Run expired timers
     while (@Timers && $Timers[0][0] <= $now) {
@@ -239,11 +202,7 @@ sub RunTimers {
     return $timeout;
 }
 
-### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
-### okay.
 sub EpollEventLoop {
-    my $class = shift;
-
     while (1) {
         my @events;
         my $i;
@@ -260,78 +219,6 @@ sub EpollEventLoop {
         }
         return unless PostEventLoop();
     }
-    exit 0;
-}
-
-### The fallback IO::Poll-based event loop. Gets installed as EventLoop if
-### IO::Epoll fails to load.
-sub PollEventLoop {
-    my $class = shift;
-
-    my PublicInbox::DS $pob;
-
-    while (1) {
-        my $timeout = RunTimers();
-
-        # the following sets up @poll as a series of ($poll,$event_mask)
-        # items, then uses IO::Poll::_poll, implemented in XS, which
-        # modifies the array in place with the even elements being
-        # replaced with the event masks that occured.
-        my @poll;
-        while ( my ($fd, $sock) = each %DescriptorMap ) {
-            push @poll, $fd, $sock->{event_watch};
-        }
-
-        # if nothing to poll, either end immediately (if no timeout)
-        # or just keep calling the callback
-        unless (@poll) {
-            select undef, undef, undef, ($timeout / 1000);
-            return unless PostEventLoop();
-            next;
-        }
-
-        my $count = IO::Poll::_poll($timeout, @poll);
-        unless ($count >= 0) {
-            return unless PostEventLoop();
-            next;
-        }
-
-        # Fetch handles with read events
-        while (@poll) {
-            my ($fd, $state) = splice(@poll, 0, 2);
-            $DescriptorMap{$fd}->event_step if $state;
-        }
-
-        return unless PostEventLoop();
-    }
-
-    exit 0;
-}
-
-### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
-### okay.
-sub KQueueEventLoop {
-    my $class = shift;
-
-    while (1) {
-        my $timeout = RunTimers();
-        my @ret = eval { $KQueue->kevent($timeout) };
-        if (my $err = $@) {
-            # workaround https://rt.cpan.org/Ticket/Display.html?id=116615
-            if ($err =~ /Interrupted system call/) {
-                @ret = ();
-            } else {
-                die $err;
-            }
-        }
-
-        foreach my $kev (@ret) {
-            $DescriptorMap{$kev->[0]}->event_step;
-        }
-        return unless PostEventLoop();
-    }
-
-    exit(0);
 }
 
 =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
@@ -362,11 +249,11 @@ sub PostEventLoop {
     while (my $sock = shift @ToClose) {
         my $fd = fileno($sock);
 
-        # close the socket.  (not a PublicInbox::DS close)
-        $sock->close;
+        # close the socket. (not a PublicInbox::DS close)
+        CORE::close($sock);
 
         # and now we can finally remove the fd from the map.  see
-        # comment above in _cleanup.
+        # comment above in ->close.
         delete $DescriptorMap{$fd};
     }
 
@@ -400,7 +287,7 @@ This is normally (always?) called from your subclass via:
 
 =cut
 sub new {
-    my ($self, $sock, $exclusive) = @_;
+    my ($self, $sock, $ev) = @_;
     $self = fields::new($self) unless ref $self;
 
     $self->{sock} = $sock;
@@ -409,36 +296,15 @@ sub new {
     Carp::cluck("undef sock and/or fd in PublicInbox::DS->new.  sock=" . ($sock || "") . ", fd=" . ($fd || ""))
         unless $sock && $fd;
 
-    $self->{wbuf} = [];
-    $self->{wbuf_off} = 0;
-    $self->{closed} = 0;
-
-    my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
-
     _InitPoller();
 
-    if ($HaveEpoll) {
-        if ($exclusive) {
-            $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP|$EPOLLEXCLUSIVE;
+    if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
+        if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
+            $ev &= ~EPOLLEXCLUSIVE;
+            goto retry;
         }
-retry:
-        if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
-            if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) {
-                $EPOLLEXCLUSIVE = 0; # old kernel
-                $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP;
-                goto retry;
-            }
-            die "couldn't add epoll watch for $fd: $!\n";
-        }
-    }
-    elsif ($HaveKQueue) {
-        # Add them to the queue but disabled for now
-        $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
-                        IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
-        $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
-                        IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
+        die "couldn't add epoll watch for $fd: $!\n";
     }
-
     Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
         if $DescriptorMap{$fd};
 
@@ -457,74 +323,148 @@ Close the socket.
 
 =cut
 sub close {
-    my PublicInbox::DS $self = $_[0];
-    return if $self->{closed};
-
-    # this does most of the work of closing us
-    $self->_cleanup();
-
-    # defer closing the actual socket until the event loop is done
-    # processing this round of events.  (otherwise we might reuse fds)
-    if (my $sock = delete $self->{sock}) {
-        push @ToClose, $sock;
-    }
-
-    return 0;
-}
-
-### METHOD: _cleanup()
-### Called by our closers so we can clean internal data structures.
-sub _cleanup {
-    my PublicInbox::DS $self = $_[0];
-
-    # we're effectively closed; we have no fd and sock when we leave here
-    $self->{closed} = 1;
+    my ($self) = @_;
+    my $sock = delete $self->{sock} or return;
 
     # we need to flush our write buffer, as there may
     # be self-referential closures (sub { $client->close })
     # preventing the object from being destroyed
-    @{$self->{wbuf}} = ();
+    delete $self->{wbuf};
 
     # if we're using epoll, we have to remove this from our epoll fd so we stop getting
     # notifications about it
-    if ($HaveEpoll && $self->{sock}) {
-        my $fd = fileno($self->{sock});
-        epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and
-            confess("EPOLL_CTL_DEL: $!");
-    }
+    my $fd = fileno($sock);
+    epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and
+        confess("EPOLL_CTL_DEL: $!");
 
     # we explicitly don't delete from DescriptorMap here until we
     # actually close the socket, as we might be in the middle of
     # processing an epoll_wait/etc that returned hundreds of fds, one
     # of which is not yet processed and is what we're closing.  if we
     # keep it in DescriptorMap, then the event harnesses can just
-    # looked at $pob->{closed} and ignore it.  but if it's an
+    # looked at $pob->{sock} == undef and ignore it.  but if it's an
     # un-accounted for fd, then it (understandably) freak out a bit
     # and emit warnings, thinking their state got off.
+
+    # defer closing the actual socket until the event loop is done
+    # processing this round of events.  (otherwise we might reuse fds)
+    push @ToClose, $sock;
+
+    return 0;
 }
 
-=head2 C<< $obj->sock() >>
+# portable, non-thread-safe sendfile emulation (no pread, yet)
+sub psendfile ($$$) {
+    my ($sock, $fh, $off) = @_;
+
+    seek($fh, $$off, SEEK_SET) or return;
+    defined(my $to_write = read($fh, my $buf, 16384)) or return;
+    my $written = 0;
+    while ($to_write > 0) {
+        if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
+            $written += $w;
+            $to_write -= $w;
+        } else {
+            return if $written == 0;
+            last;
+        }
+    }
+    $$off += $written;
+    $written;
+}
 
-Returns the underlying IO::Handle for the object.
+# returns 1 if done, 0 if incomplete
+sub flush_write ($) {
+    my ($self) = @_;
+    my $wbuf = $self->{wbuf} or return 1;
+    my $sock = $self->{sock};
+
+next_buf:
+    while (my $bref = $wbuf->[0]) {
+        if (ref($bref) ne 'CODE') {
+            my $off = delete($self->{wbuf_off}) // 0;
+            while ($sock) {
+                my $w = psendfile($sock, $bref, \$off);
+                if (defined $w) {
+                    if ($w == 0) {
+                        shift @$wbuf;
+                        goto next_buf;
+                    }
+                } elsif ($! == EAGAIN) {
+                    $self->{wbuf_off} = $off;
+                    watch($self, EPOLLOUT|EPOLLONESHOT);
+                    return 0;
+                } else {
+                    return $self->close;
+                }
+            }
+        } else { #($ref eq 'CODE') {
+            shift @$wbuf;
+            my $before = scalar(@$wbuf);
+            $bref->($self);
 
-=cut
-sub sock {
-    my PublicInbox::DS $self = shift;
-    return $self->{sock};
+            # bref may be enqueueing more CODE to call (see accept_tls_step)
+            return 0 if (scalar(@$wbuf) > $before);
+        }
+    } # while @$wbuf
+
+    delete $self->{wbuf};
+    1; # all done
+}
+
+sub do_read ($$$$) {
+    my ($self, $rbuf, $len, $off) = @_;
+    my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+    return ($r == 0 ? $self->close : $r) if defined $r;
+    # common for clients to break connections without warning,
+    # would be too noisy to log here:
+    if (ref($self) eq 'IO::Socket::SSL') {
+        my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+        watch($self, $ev | EPOLLONESHOT);
+    } elsif ($! == EAGAIN) {
+        watch($self, EPOLLIN | EPOLLONESHOT);
+    } else {
+        $self->close;
+    }
+}
+
+# drop the socket if we hit unrecoverable errors on our system which
+# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE...
+sub drop {
+    my $self = shift;
+    carp(@_);
+    $self->close;
+}
+
+# n.b.: use ->write/->read for this buffer to allow compatibility with
+# PerlIO::mmap or PerlIO::scalar if needed
+sub tmpio ($$$) {
+    my ($self, $bref, $off) = @_;
+    my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND
+    do {
+        my $fn = File::Spec->tmpdir . '/wbuf-' . rand;
+        if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely
+            unlink($fn) or return drop($self, "unlink($fn) $!");
+        } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM
+            return drop($self, "open: $!");
+        }
+    } until (defined $fh);
+    $fh->autoflush(1);
+    my $len = bytes::length($$bref) - $off;
+    $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
+    $fh
 }
 
 =head2 C<< $obj->write( $data ) >>
 
 Write the specified data to the underlying handle.  I<data> may be scalar,
-scalar ref, code ref (to run when there), or undef just to kick-start.
+scalar ref, code ref (to run when there).
 Returns 1 if writes all went through, or 0 if there are writes in queue. If
 it returns 1, caller should stop waiting for 'writable' events)
 
 =cut
 sub write {
-    my PublicInbox::DS $self;
-    my $data;
-    ($self, $data) = @_;
+    my ($self, $data) = @_;
 
     # nobody should be writing to closed sockets, but caller code can
     # do two writes within an event, have the first fail and
@@ -533,203 +473,113 @@ sub write {
     # now-dead object does its second write.  that is this case.  we
     # just lie and say it worked.  it'll be dead soon and won't be
     # hurt by this lie.
-    return 1 if $self->{closed};
-
-    my $bref;
-
-    # just queue data if there's already a wait
-    my $need_queue;
+    my $sock = $self->{sock} or return 1;
+    my $ref = ref $data;
+    my $bref = $ref ? $data : \$data;
     my $wbuf = $self->{wbuf};
-
-    if (defined $data) {
-        $bref = ref $data ? $data : \$data;
-        if (scalar @$wbuf) {
+    if ($wbuf && scalar(@$wbuf)) { # already buffering, can't write more...
+        if ($ref eq 'CODE') {
             push @$wbuf, $bref;
-            return 0;
-        }
-
-        # this flag says we're bypassing the queue system, knowing we're the
-        # only outstanding write, and hoping we don't ever need to use it.
-        # if so later, though, we'll need to queue
-        $need_queue = 1;
-    }
-
-  WRITE:
-    while (1) {
-        return 1 unless $bref ||= $wbuf->[0];
-
-        my $len;
-        eval {
-            $len = length($$bref); # this will die if $bref is a code ref, caught below
-        };
-        if ($@) {
-            if (UNIVERSAL::isa($bref, "CODE")) {
-                unless ($need_queue) {
-                    shift @$wbuf;
-                }
-                $bref->();
-
-                # code refs are just run and never get reenqueued
-                # (they're one-shot), so turn off the flag indicating the
-                # outstanding data needs queueing.
-                $need_queue = 0;
-
-                undef $bref;
-                next WRITE;
+        } else {
+            my $last = $wbuf->[-1];
+            if (ref($last) eq 'GLOB') { # append to tmp file buffer
+                $last->print($$bref) or return drop($self, "print: $!");
+            } else {
+                my $tmpio = tmpio($self, $bref, 0) or return 0;
+                push @$wbuf, $tmpio;
             }
-            die "Write error: $@ <$bref>";
         }
-
-        my $to_write = $len - $self->{wbuf_off};
-        my $written = syswrite($self->{sock}, $$bref, $to_write,
-                               $self->{wbuf_off});
-
-        if (! defined $written) {
-            if ($! == EAGAIN) {
-                # since connection has stuff to write, it should now be
-                # interested in pending writes:
-                if ($need_queue) {
-                    push @$wbuf, $bref;
-                }
-                $self->watch_write(1);
-                return 0;
-            }
-
+        return 0;
+    } elsif ($ref eq 'CODE') {
+        $bref->($self);
+        return 1;
+    } else {
+        my $to_write = bytes::length($$bref);
+        my $written = syswrite($sock, $$bref, $to_write);
+
+        if (defined $written) {
+            return 1 if $written == $to_write;
+        } elsif ($! == EAGAIN) {
+            $written = 0;
+        } else {
             return $self->close;
-        } elsif ($written != $to_write) {
-            if ($need_queue) {
-                push @$wbuf, $bref;
-            }
-            # since connection has stuff to write, it should now be
-            # interested in pending writes:
-            $self->{wbuf_off} += $written;
-            $self->on_incomplete_write;
-            return 0;
-        } elsif ($written == $to_write) {
-            $self->{wbuf_off} = 0;
-            $self->watch_write(0);
-
-            # this was our only write, so we can return immediately
-            # since we avoided incrementing the buffer size or
-            # putting it in the buffer.  we also know there
-            # can't be anything else to write.
-            return 1 if $need_queue;
-
-            shift @$wbuf;
-            undef $bref;
-            next WRITE;
         }
-    }
-}
-
-sub on_incomplete_write {
-    my PublicInbox::DS $self = shift;
-    $self->watch_write(1);
-}
-
-=head2 C<< $obj->watch_read( $boolean ) >>
-
-Turn 'readable' event notification on or off.
+        my $tmpio = tmpio($self, $bref, $written) or return 0;
 
-=cut
-sub watch_read {
-    my PublicInbox::DS $self = shift;
-    return if $self->{closed} || !$self->{sock};
-
-    my $val = shift;
-    my $event = $self->{event_watch};
-
-    $event &= ~POLLIN if ! $val;
-    $event |=  POLLIN if   $val;
-
-    my $fd = fileno($self->{sock});
-    # If it changed, set it
-    if ($event != $self->{event_watch}) {
-        if ($HaveKQueue) {
-            $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
-                            $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
-        }
-        elsif ($HaveEpoll) {
-            epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
-                confess("EPOLL_CTL_MOD: $!");
-        }
-        $self->{event_watch} = $event;
+        # wbuf may be an empty array if we're being called inside
+        # ->flush_write via CODE bref:
+        push @{$self->{wbuf} ||= []}, $tmpio;
+        watch($self, EPOLLOUT|EPOLLONESHOT);
+        return 0;
     }
 }
 
-=head2 C<< $obj->watch_write( $boolean ) >>
-
-Turn 'writable' event notification on or off.
-
-=cut
-sub watch_write {
-    my PublicInbox::DS $self = shift;
-    return if $self->{closed} || !$self->{sock};
-
-    my $val = shift;
-    my $event = $self->{event_watch};
-
-    $event &= ~POLLOUT if ! $val;
-    $event |=  POLLOUT if   $val;
-    my $fd = fileno($self->{sock});
-
-    # If it changed, set it
-    if ($event != $self->{event_watch}) {
-        if ($HaveKQueue) {
-            $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
-                            $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
-        }
-        elsif ($HaveEpoll) {
-            epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
-                    confess "EPOLL_CTL_MOD: $!";
+use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
+
+sub msg_more ($$) {
+    my $self = $_[0];
+    my $sock = $self->{sock} or return 1;
+
+    if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') {
+        my $n = send($sock, $_[1], MSG_MORE);
+        if (defined $n) {
+            my $nlen = bytes::length($_[1]) - $n;
+            return 1 if $nlen == 0; # all done!
+            # queue up the unwritten substring:
+            my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
+            $self->{wbuf} = [ $tmpio ];
+            watch($self, EPOLLOUT|EPOLLONESHOT);
+            return 0;
         }
-        $self->{event_watch} = $event;
     }
+    $self->write(\($_[1]));
 }
 
-=head2 C<< $obj->dump_error( $message ) >>
-
-Prints to STDERR a backtrace with information about this socket and what lead
-up to the dump_error call.
+sub watch ($$) {
+    my ($self, $ev) = @_;
+    my $sock = $self->{sock} or return;
+    epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
+        confess("EPOLL_CTL_MOD $!");
+    0;
+}
 
-=cut
-sub dump_error {
-    my $i = 0;
-    my @list;
-    while (my ($file, $line, $sub) = (caller($i++))[1..3]) {
-        push @list, "\t$file:$line called $sub\n";
+sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
+
+# return true if complete, false if incomplete (or failure)
+sub accept_tls_step ($) {
+    my ($self) = @_;
+    my $sock = $self->{sock} or return;
+    return 1 if $sock->accept_SSL;
+    return $self->close if $! != EAGAIN;
+    if (my $ev = PublicInbox::TLS::epollbit()) {
+        unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+        return watch($self, $ev | EPOLLONESHOT);
     }
-
-    warn "ERROR: $_[1]\n" .
-        "\t$_[0] = " . $_[0]->as_string . "\n" .
-        join('', @list);
+    drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
 }
 
-=head2 C<< $obj->debugmsg( $format, @args ) >>
-
-Print the debugging message specified by the C<sprintf>-style I<format> and
-I<args>.
-
-=cut
-sub debugmsg {
-    my ( $self, $fmt, @args ) = @_;
-    confess "Not an object" unless ref $self;
-
-    chomp $fmt;
-    printf STDERR ">>> $fmt\n", @args;
+sub shutdn_tls_step ($) {
+    my ($self) = @_;
+    my $sock = $self->{sock} or return;
+    return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
+    return $self->close if $! != EAGAIN;
+    if (my $ev = PublicInbox::TLS::epollbit()) {
+        unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+        return watch($self, $ev | EPOLLONESHOT);
+    }
+    drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
 }
 
-=head2 C<< $obj->as_string() >>
-
-Returns a string describing this socket.
-
-=cut
-sub as_string {
-    my PublicInbox::DS $self = shift;
-    my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') .
-                   ($self->{event_watch} & POLLOUT ? 'W' : '') . ")";
-    my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open");
-    return $ret;
+# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
+# or fork w/o exec, so no inadvertant socket sharing
+sub shutdn ($) {
+    my ($self) = @_;
+    my $sock = $self->{sock} or return;
+    if (ref($sock) eq 'IO::Socket::SSL') {
+        shutdn_tls_step($self);
+    } else {
+       $self->close;
+    }
 }
 
 package PublicInbox::DS::Timer;
diff --git a/lib/PublicInbox/DSKQXS.pm b/lib/PublicInbox/DSKQXS.pm
new file mode 100644 (file)
index 0000000..38e1344
--- /dev/null
@@ -0,0 +1,73 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+#  <https://www.gnu.org/licenses/gpl-1.0.txt>
+#  <https://dev.perl.org/licenses/artistic.html>
+#
+# kqueue support via IO::KQueue XS module.  This makes kqueue look
+# like epoll to simplify the code in DS.pm.  This is NOT meant to be
+# an all encompassing emulation of epoll via IO::KQueue, but just to
+# support cases public-inbox-nntpd/httpd care about.
+# A pure-Perl version using syscall() is planned, and it should be
+# faster due to the lack of syscall overhead.
+package PublicInbox::DSKQXS;
+use strict;
+use warnings;
+use parent qw(IO::KQueue);
+use parent qw(Exporter);
+use IO::KQueue;
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+our @EXPORT = qw(epoll_ctl epoll_wait);
+my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec)
+
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+       my ($bit, $ev) = @_;
+       if ($ev & $bit) {
+               my $fl = EV_ADD | EV_ENABLE;
+               ($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl;
+       } else {
+               EV_ADD | EV_DISABLE;
+       }
+}
+
+sub new {
+       my ($class) = @_;
+       die 'non-singleton use not supported' if $owner_pid == $$;
+       $owner_pid = $$;
+       $class->SUPER::new;
+}
+
+sub epoll_ctl {
+       my ($self, $op, $fd, $ev) = @_;
+       if ($op != EPOLL_CTL_DEL) {
+               $self->EV_SET($fd, EVFILT_READ, kq_flag(EPOLLIN, $ev));
+               $self->EV_SET($fd, EVFILT_WRITE, kq_flag(EPOLLOUT, $ev));
+       }
+       0;
+}
+
+sub epoll_wait {
+       my ($self, $maxevents, $timeout_msec, $events) = @_;
+       @$events = eval { $self->kevent($timeout_msec) };
+       if (my $err = $@) {
+               # workaround https://rt.cpan.org/Ticket/Display.html?id=116615
+               if ($err =~ /Interrupted system call/) {
+                       @$events = ();
+               } else {
+                       die $err;
+               }
+       }
+       # caller only cares for $events[$i]->[0]
+       scalar(@$events);
+}
+
+sub DESTROY {
+       my ($self) = @_;
+       if ($owner_pid == $$) {
+               POSIX::close($$self);
+               $owner_pid = -1;
+       }
+}
+
+1;
diff --git a/lib/PublicInbox/DSPoll.pm b/lib/PublicInbox/DSPoll.pm
new file mode 100644 (file)
index 0000000..e65640a
--- /dev/null
@@ -0,0 +1,58 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+#  <https://www.gnu.org/licenses/gpl-1.0.txt>
+#  <https://dev.perl.org/licenses/artistic.html>
+#
+# poll(2) via IO::Poll core module.  This makes poll look
+# like epoll to simplify the code in DS.pm.  This is NOT meant to be
+# an all encompassing emulation of epoll via IO::Poll, but just to
+# support cases public-inbox-nntpd/httpd care about.
+package PublicInbox::DSPoll;
+use strict;
+use warnings;
+use parent qw(Exporter);
+use IO::Poll;
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+our @EXPORT = qw(epoll_ctl epoll_wait);
+
+sub new { bless {}, $_[0] } # fd => events
+
+sub epoll_ctl {
+       my ($self, $op, $fd, $ev) = @_;
+
+       # not wasting time on error checking
+       if ($op != EPOLL_CTL_DEL) {
+               $self->{$fd} = $ev;
+       } else {
+               delete $self->{$fd};
+       }
+       0;
+}
+
+sub epoll_wait {
+       my ($self, $maxevents, $timeout_msec, $events) = @_;
+       my @pset;
+       while (my ($fd, $events) = each %$self) {
+               my $pevents = $events & EPOLLIN ? POLLIN : 0;
+               $pevents |= $events & EPOLLOUT ? POLLOUT : 0;
+               push(@pset, $fd, $pevents);
+       }
+       @$events = ();
+       my $n = IO::Poll::_poll($timeout_msec, @pset);
+       if ($n >= 0) {
+               for (my $i = 0; $i < @pset; ) {
+                       my $fd = $pset[$i++];
+                       my $revents = $pset[$i++] or next;
+                       delete($self->{$fd}) if $self->{$fd} & EPOLLONESHOT;
+                       push @$events, [ $fd ];
+               }
+               my $nevents = scalar @$events;
+               if ($n != $nevents) {
+                       warn "BUG? poll() returned $n, but got $nevents";
+               }
+       }
+       $n;
+}
+
+1;
index 227ba5f979d09ef23eebaf892284b27137222165..cf011a2020b560d544766021435b822d4e32ff21 100644 (file)
@@ -8,11 +8,12 @@ use warnings;
 use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
 use IO::Handle;
 use IO::Socket;
+use Socket qw(IPPROTO_TCP SOL_SOCKET);
+sub SO_ACCEPTFILTER () { 0x1000 }
 use Cwd qw/abs_path/;
-use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
 STDOUT->autoflush(1);
 STDERR->autoflush(1);
-require PublicInbox::DS;
+use PublicInbox::DS qw(now);
 require PublicInbox::EvCleanup;
 require POSIX;
 require PublicInbox::Listener;
@@ -23,11 +24,55 @@ my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
 my $worker_processes = 1;
 my @listeners;
 my %pids;
-my %listener_names;
+my %listener_names; # sockname => IO::Handle
+my %tls_opt; # scheme://sockname => args for IO::Socket::SSL->start_SSL
 my $reexec_pid;
 my $cleanup;
 my ($uid, $gid);
+my ($default_cert, $default_key);
 END { $cleanup->() if $cleanup };
+my %KNOWN_TLS = ( 443 => 'https', 563 => 'nntps' );
+my %KNOWN_STARTTLS = ( 119 => 'nntp' );
+
+sub accept_tls_opt ($) {
+       my ($opt_str) = @_;
+       # opt_str: opt1=val1,opt2=val2 (opt may repeat for multi-value)
+       require PublicInbox::TLS;
+       my $o = {};
+       # allow ',' as delimiter since '&' is shell-unfriendly
+       foreach (split(/[,&]/, $opt_str)) {
+               my ($k, $v) = split(/=/, $_, 2);
+               push @{$o->{$k} ||= []}, $v;
+       }
+
+       # key may be a part of cert.  At least
+       # p5-io-socket-ssl/example/ssl_server.pl has this fallback:
+       $o->{cert} //= [ $default_cert ];
+       $o->{key} //= defined($default_key) ? [ $default_key ] : $o->{cert};
+       my %ctx_opt = (SSL_server => 1);
+       # parse out hostname:/path/to/ mappings:
+       foreach my $k (qw(cert key)) {
+               my $x = $ctx_opt{'SSL_'.$k.'_file'} = {};
+               foreach my $path (@{$o->{$k}}) {
+                       my $host = '';
+                       $path =~ s/\A([^:]+):// and $host = $1;
+                       $x->{$host} = $path;
+               }
+       }
+       my $ctx = IO::Socket::SSL::SSL_Context->new(%ctx_opt) or
+               die 'SSL_Context->new: '.PublicInbox::TLS::err();
+
+       # save ~34K per idle connection (cf. SSL_CTX_set_mode(3ssl))
+       # RSS goes from 346MB to 171MB with 10K idle NNTPS clients on amd64
+       # cf. https://rt.cpan.org/Ticket/Display.html?id=129463
+       my $mode = eval { Net::SSLeay::MODE_RELEASE_BUFFERS() };
+       if ($mode && $ctx->{context}) {
+               eval { Net::SSLeay::CTX_set_mode($ctx->{context}, $mode) };
+               warn "W: $@ (setting SSL_MODE_RELEASE_BUFFERS)\n" if $@;
+       }
+
+       { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx };
+}
 
 sub daemon_prepare ($) {
        my ($default_listen) = @_;
@@ -43,6 +88,8 @@ sub daemon_prepare ($) {
                'u|user=s' => \$user,
                'g|group=s' => \$group,
                'D|daemonize' => \$daemonize,
+               'cert=s' => \$default_cert,
+               'key=s' => \$default_key,
        );
        GetOptions(%opts) or die "bad command-line args\n";
 
@@ -50,12 +97,34 @@ sub daemon_prepare ($) {
                die "--pid-file cannot end with '.oldbin'\n";
        }
        @listeners = inherit();
+
+       # allow socket-activation users to set certs once and not
+       # have to configure each socket:
+       my @inherited_names = keys(%listener_names) if defined($default_cert);
+
        # ignore daemonize when inheriting
        $daemonize = undef if scalar @listeners;
 
        push @cfg_listen, $default_listen unless (@listeners || @cfg_listen);
 
        foreach my $l (@cfg_listen) {
+               my $orig = $l;
+               my $scheme = '';
+               if ($l =~ s!\A([^:]+)://!!) {
+                       $scheme = $1;
+               } elsif ($l =~ /\A(?:\[[^\]]+\]|[^:]+):([0-9])+/) {
+                       my $s = $KNOWN_TLS{$1} // $KNOWN_STARTTLS{$1};
+                       $scheme = $s if defined $s;
+               }
+               if ($l =~ s!/?\?(.+)\z!!) {
+                       $tls_opt{"$scheme://$l"} = accept_tls_opt($1);
+               } elsif (defined($default_cert)) {
+                       $tls_opt{"$scheme://$l"} = accept_tls_opt('');
+               } elsif ($scheme =~ /\A(?:nntps|https)\z/) {
+                       die "$orig specified w/o cert=\n";
+               }
+               # TODO: use scheme to load either NNTP.pm or HTTP.pm
+
                next if $listener_names{$l}; # already inherited
                my (%o, $sock_pkg);
                if (index($l, '/') == 0) {
@@ -92,6 +161,20 @@ sub daemon_prepare ($) {
                        push @listeners, $s;
                }
        }
+
+       # cert/key options in @cfg_listen takes precedence when inheriting,
+       # but map well-known inherited ports if --listen isn't specified
+       # at all
+       for my $sockname (@inherited_names) {
+               $sockname =~ /:([0-9]+)\z/ or next;
+               if (my $scheme = $KNOWN_TLS{$1}) {
+                       $tls_opt{"$scheme://$sockname"} ||= accept_tls_opt('');
+               } elsif (($scheme = $KNOWN_STARTTLS{$1})) {
+                       next if $tls_opt{"$scheme://$sockname"};
+                       $tls_opt{''} ||= accept_tls_opt('');
+               }
+       }
+
        die "No listeners bound\n" unless @listeners;
 }
 
@@ -183,7 +266,7 @@ sub worker_quit {
        PublicInbox::DS->SetPostLoopCallback(sub {
                my ($dmap, undef) = @_;
                my $n = 0;
-               my $now = clock_gettime(CLOCK_MONOTONIC);
+               my $now = now();
 
                foreach my $s (values %$dmap) {
                        $s->can('busy') or next;
@@ -195,9 +278,9 @@ sub worker_quit {
                        }
                }
                if ($n) {
-                       if (($warn + 5) < time) {
+                       if (($warn + 5) < now()) {
                                warn "$$ quitting, $n client(s) left\n";
-                               $warn = time;
+                               $warn = now();
                        }
                        unless (defined $proc_name) {
                                $proc_name = (split(/\s+/, $0))[0];
@@ -462,9 +545,43 @@ sub master_loop {
        exit # never gets here, just for documentation
 }
 
-sub daemon_loop ($$) {
-       my ($refresh, $post_accept) = @_;
+sub tls_start_cb ($$) {
+       my ($opt, $orig_post_accept) = @_;
+       sub {
+               my ($io, $addr, $srv) = @_;
+               my $ssl = IO::Socket::SSL->start_SSL($io, %$opt);
+               $orig_post_accept->($ssl, $addr, $srv);
+       }
+}
+
+sub defer_accept ($$) {
+       my ($s, $af_name) = @_;
+       return unless defined $af_name;
+       if ($^O eq 'linux') {
+               my $x = getsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT());
+               return unless defined $x; # may be Unix socket
+               my $sec = unpack('i', $x);
+               return if $sec > 0; # systemd users may set a higher value
+               setsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 1);
+       } elsif ($^O eq 'freebsd') {
+               my $x = getsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER);
+               return if defined $x; # don't change if set
+               my $accf_arg = pack('a16a240', $af_name, '');
+               setsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER, $accf_arg);
+       }
+}
+
+sub daemon_loop ($$$$) {
+       my ($refresh, $post_accept, $nntpd, $af_default) = @_;
        PublicInbox::EvCleanup::enable(); # early for $refresh
+       my %post_accept;
+       while (my ($k, $v) = each %tls_opt) {
+               if ($k =~ s!\A(?:nntps|https)://!!) {
+                       $post_accept{$k} = tls_start_cb($v, $post_accept);
+               } elsif ($nntpd) { # STARTTLS, $k eq '' is OK
+                       $nntpd->{accept_tls} = $v;
+               }
+       }
        my $parent_pipe;
        if ($worker_processes > 0) {
                $refresh->(); # preload by default
@@ -483,20 +600,27 @@ sub daemon_loop ($$) {
        $SIG{HUP} = $refresh;
        $SIG{CHLD} = 'DEFAULT';
        $SIG{$_} = 'IGNORE' for qw(USR2 TTIN TTOU WINCH);
-       # this calls epoll_create:
-       @listeners = map {
-               PublicInbox::Listener->new($_, $post_accept)
+       @listeners = map {;
+               my $tls_cb = $post_accept{sockname($_)};
+
+               # NNTPS, HTTPS, HTTP, and POP3S are client-first traffic
+               # NNTP and POP3 are server-first
+               defer_accept($_, $tls_cb ? 'dataready' : $af_default);
+
+               # this calls epoll_create:
+               PublicInbox::Listener->new($_, $tls_cb || $post_accept)
        } @listeners;
        PublicInbox::DS->EventLoop;
        $parent_pipe = undef;
 }
 
 
-sub run ($$$) {
-       my ($default, $refresh, $post_accept) = @_;
+sub run ($$$;$) {
+       my ($default, $refresh, $post_accept, $nntpd) = @_;
        daemon_prepare($default);
+       my $af_default = $default =~ /:8080\z/ ? 'httpready' : undef;
        daemonize();
-       daemon_loop($refresh, $post_accept);
+       daemon_loop($refresh, $post_accept, $nntpd, $af_default);
 }
 
 sub do_chown ($) {
index c64e2388512508cfea87624007adce585eadfc24..33b54ebc05fb6ef0c7af0d1cbbba7d1378e805e5 100644 (file)
@@ -6,6 +6,7 @@ package PublicInbox::EvCleanup;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
 
 my $ENABLED;
 sub enabled { $ENABLED }
@@ -23,13 +24,13 @@ sub once_init () {
        # fires in the next event loop iteration.
        pipe($r, $w) or die "pipe: $!";
        fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
-       $self->SUPER::new($w);
+       $self->SUPER::new($w, 0);
 
        # always writable, since PublicInbox::EvCleanup::event_step
        # never drains wbuf.  We can avoid wasting a hash slot by
        # stuffing the read-end of the pipe into the never-to-be-touched
        # wbuf
-       push @{$self->{wbuf}}, $r;
+       $self->{wbuf} = $r;
        $self;
 }
 
@@ -45,7 +46,9 @@ sub _run_all ($) {
 # ensure PublicInbox::DS::ToClose processing after timers fire
 sub _asap_close () { $asapq->[1] ||= _asap_timer() }
 
-sub _run_asap () { _run_all($asapq) }
+# Called by PublicInbox::DS
+sub event_step { _run_all($asapq) }
+
 sub _run_next () {
        _run_all($nextq);
        _asap_close();
@@ -56,16 +59,9 @@ sub _run_later () {
        _asap_close();
 }
 
-# Called by PublicInbox::DS
-sub event_step {
-       my ($self) = @_;
-       $self->watch_write(0);
-       _run_asap();
-}
-
 sub _asap_timer () {
        $singleton ||= once_init();
-       $singleton->watch_write(1);
+       $singleton->watch(EPOLLOUT|EPOLLONESHOT);
        1;
 }
 
@@ -88,7 +84,7 @@ sub later ($) {
 }
 
 END {
-       _run_asap();
+       event_step();
        _run_all($nextq);
        _run_all($laterq);
 }
index a2a81f8ea66a2dbe3ed81f885a5fda945e71bb38..303d5073850960943685719e6d49e297744979cb 100644 (file)
@@ -231,18 +231,16 @@ sub input_prepare {
                        return;
                }
                last if $r == 0;
-               my $off = 0;
-               while ($r > 0) {
-                       my $w = syswrite($in, $buf, $r, $off);
-                       if (defined $w) {
-                               $r -= $w;
-                               $off += $w;
-                       } else {
-                               err($env, "error writing temporary file: $!");
-                               return;
-                       }
+               unless (print $in $buf) {
+                       err($env, "error writing temporary file: $!");
+                       return;
                }
        }
+       # ensure it's visible to git-http-backend(1):
+       unless ($in->flush) {
+               err($env, "error writing temporary file: $!");
+               return;
+       }
        unless (defined(sysseek($in, 0, SEEK_SET))) {
                err($env, "error seeking temporary file: $!");
                return;
index 45bf23ec086702594cd0eed85298d4d52adb01b9..a1cb4aca9469c236bb090fa92c5deaafa834e009 100644 (file)
@@ -19,12 +19,15 @@ use HTTP::Status qw(status_message);
 use HTTP::Date qw(time2str);
 use IO::Handle;
 require PublicInbox::EvCleanup;
+PublicInbox::DS->import(qw(msg_more));
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
 use constant {
        CHUNK_START => -1,   # [a-f0-9]+\r\n
        CHUNK_END => -2,     # \r\n
        CHUNK_ZEND => -3,    # \r\n
        CHUNK_MAX_HDR => 256,
 };
+use Errno qw(EAGAIN);
 
 my $pipelineq = [];
 my $pipet;
@@ -33,7 +36,7 @@ sub process_pipelineq () {
        $pipet = undef;
        $pipelineq = [];
        foreach (@$q) {
-               next if $_->{closed};
+               next unless $_->{sock};
                rbuf_process($_);
        }
 }
@@ -55,39 +58,26 @@ sub http_date () {
 sub new ($$$) {
        my ($class, $sock, $addr, $httpd) = @_;
        my $self = fields::new($class);
-       $self->SUPER::new($sock);
+       $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
        $self->{httpd} = $httpd;
        $self->{rbuf} = '';
        ($self->{remote_addr}, $self->{remote_port}) =
                PublicInbox::Daemon::host_with_port($addr);
-       $self->watch_read(1);
        $self;
 }
 
 sub event_step { # called by PublicInbox::DS
        my ($self) = @_;
 
-       my $wbuf = $self->{wbuf};
-       if (@$wbuf) {
-               $self->write(undef);
-               return if $self->{closed} || scalar(@$wbuf);
-       }
+       return unless $self->flush_write && $self->{sock};
+
        # only read more requests if we've drained the write buffer,
        # otherwise we can be buffering infinitely w/o backpressure
 
        return read_input($self) if defined $self->{env};
-
-       my $off = length($self->{rbuf});
-       my $r = sysread($self->{sock}, $self->{rbuf}, 8192, $off);
-       if (defined $r) {
-               return $self->close if $r == 0;
-               return rbuf_process($self);
-       }
-       return if $!{EAGAIN}; # no need to call watch_read(1) again
-
-       # common for clients to break connections without warning,
-       # would be too noisy to log here:
-       return $self->close;
+       my $rbuf = \($self->{rbuf});
+       my $off = bytes::length($$rbuf);
+       $self->do_read($rbuf, 8192, $off) and rbuf_process($self);
 }
 
 sub rbuf_process {
@@ -100,10 +90,10 @@ sub rbuf_process {
        # (they are rarely-used and git (as of 2.7.2) does not use them)
        if ($r == -1 || $env{HTTP_TRAILER} ||
                        # this length-check is necessary for PURE_PERL=1:
-                       ($r == -2 && length($self->{rbuf}) > 0x4000)) {
+                       ($r == -2 && bytes::length($self->{rbuf}) > 0x4000)) {
                return quit($self, 400);
        }
-       return $self->watch_read(1) if $r < 0; # incomplete
+       return $self->watch_in1 if $r < 0; # incomplete
        $self->{rbuf} = substr($self->{rbuf}, $r);
 
        my $len = input_prepare($self, \%env);
@@ -112,6 +102,15 @@ sub rbuf_process {
        $len ? read_input($self) : app_dispatch($self);
 }
 
+# IO::Handle::write returns boolean, this returns bytes written:
+sub xwrite ($$$) {
+       my ($fh, $rbuf, $max) = @_;
+       my $w = bytes::length($$rbuf);
+       $w = $max if $w > $max;
+       $fh->write($$rbuf, $w) or return;
+       $w;
+}
+
 sub read_input ($) {
        my ($self) = @_;
        my $env = $self->{env};
@@ -120,14 +119,13 @@ sub read_input ($) {
 
        # env->{CONTENT_LENGTH} (identity)
        my $sock = $self->{sock};
-       my $len = $self->{input_left};
-       $self->{input_left} = undef;
+       my $len = delete $self->{input_left};
        my $rbuf = \($self->{rbuf});
        my $input = $env->{'psgi.input'};
 
        while ($len > 0) {
                if ($$rbuf ne '') {
-                       my $w = write_in_full($input, $rbuf, $len);
+                       my $w = xwrite($input, $rbuf, $len);
                        return write_err($self, $len) unless $w;
                        $len -= $w;
                        die "BUG: $len < 0 (w=$w)" if $len < 0;
@@ -146,7 +144,6 @@ sub read_input ($) {
 
 sub app_dispatch {
        my ($self, $input) = @_;
-       $self->watch_read(0);
        my $env = $self->{env};
        $env->{REMOTE_ADDR} = $self->{remote_addr};
        $env->{REMOTE_PORT} = $self->{remote_port};
@@ -210,9 +207,9 @@ sub response_header_write {
        $h .= 'Date: ' . http_date() . "\r\n\r\n";
 
        if (($len || $chunked) && $env->{REQUEST_METHOD} ne 'HEAD') {
-               more($self, $h);
+               msg_more($self, $h);
        } else {
-               $self->write($h);
+               $self->write(\$h);
        }
        $alive;
 }
@@ -222,12 +219,12 @@ sub chunked_wcb ($) {
        my ($self) = @_;
        sub {
                return if $_[0] eq '';
-               more($self, sprintf("%x\r\n", bytes::length($_[0])));
-               more($self, $_[0]);
+               msg_more($self, sprintf("%x\r\n", bytes::length($_[0])));
+               msg_more($self, $_[0]);
 
-               # use $self->write("\n\n") if you care about real-time
+               # use $self->write(\"\n\n") if you care about real-time
                # streaming responses, public-inbox WWW does not.
-               more($self, "\r\n");
+               msg_more($self, "\r\n");
        }
 }
 
@@ -239,7 +236,7 @@ sub identity_wcb ($) {
 sub next_request ($) {
        my ($self) = @_;
        if ($self->{rbuf} eq '') { # wait for next request
-               $self->watch_read(1);
+               $self->watch_in1;
        } else { # avoid recursion for pipelined requests
                push @$pipelineq, $self;
                $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
@@ -249,10 +246,9 @@ sub next_request ($) {
 sub response_done_cb ($$) {
        my ($self, $alive) = @_;
        sub {
-               my $env = $self->{env};
-               $self->{env} = undef;
-               $self->write("0\r\n\r\n") if $alive == 2;
-               $self->write(sub{$alive ? next_request($self) : $self->close});
+               my $env = delete $self->{env};
+               $self->write(\"0\r\n\r\n") if $alive == 2;
+               $self->write($alive ? \&next_request : \&close);
        }
 }
 
@@ -266,9 +262,9 @@ sub getline_cb ($$$) {
                my $buf = eval { $forward->getline };
                if (defined $buf) {
                        $write->($buf); # may close in PublicInbox::DS::write
-                       unless ($self->{closed}) {
+                       if ($self->{sock}) {
                                my $next = $self->{pull};
-                               if (scalar @{$self->{wbuf}}) {
+                               if ($self->{wbuf}) {
                                        $self->write($next);
                                } else {
                                        PublicInbox::EvCleanup::asap($next);
@@ -282,7 +278,7 @@ sub getline_cb ($$$) {
                }
        }
 
-       $self->{forward} = $self->{pull} = undef;
+       delete @$self{qw(forward pull)};
        # avoid recursion
        if ($forward) {
                eval { $forward->close };
@@ -319,21 +315,9 @@ sub response_write {
        }
 }
 
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-sub more ($$) {
-       my $self = $_[0];
-       return if $self->{closed};
-       if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
-               my $n = send($self->{sock}, $_[1], MSG_MORE);
-               if (defined $n) {
-                       my $nlen = length($_[1]) - $n;
-                       return 1 if $nlen == 0; # all done!
-
-                       # PublicInbox::DS::write queues the unwritten substring:
-                       return $self->write(substr($_[1], $n, $nlen));
-               }
-       }
-       $self->write($_[1]);
+sub input_tmpfile ($) {
+       open($_[0], '+>', undef);
+       $_[0]->autoflush(1);
 }
 
 sub input_prepare {
@@ -345,10 +329,10 @@ sub input_prepare {
                        quit($self, 413);
                        return;
                }
-               open($input, '+>', undef);
+               input_tmpfile($input);
        } elsif (env_chunked($env)) {
                $len = CHUNK_START;
-               open($input, '+>', undef);
+               input_tmpfile($input);
        } else {
                $input = $null_io;
        }
@@ -378,46 +362,31 @@ sub write_err {
 sub recv_err {
        my ($self, $r, $len) = @_;
        return $self->close if (defined $r && $r == 0);
-       if ($!{EAGAIN}) {
+       if ($! == EAGAIN) {
                $self->{input_left} = $len;
-               return;
+               return $self->watch_in1;
        }
        err($self, "error reading for input: $! ($len bytes remaining)");
        quit($self, 500);
 }
 
-sub write_in_full {
-       my ($fh, $rbuf, $len) = @_;
-       my $rv = 0;
-       my $off = 0;
-       while ($len > 0) {
-               my $w = syswrite($fh, $$rbuf, $len, $off);
-               return ($rv ? $rv : $w) unless $w; # undef or 0
-               $rv += $w;
-               $off += $w;
-               $len -= $w;
-       }
-       $rv
-}
-
 sub read_input_chunked { # unlikely...
        my ($self) = @_;
        my $input = $self->{env}->{'psgi.input'};
        my $sock = $self->{sock};
-       my $len = $self->{input_left};
-       $self->{input_left} = undef;
+       my $len = delete $self->{input_left};
        my $rbuf = \($self->{rbuf});
 
        while (1) { # chunk start
                if ($len == CHUNK_ZEND) {
                        $$rbuf =~ s/\A\r\n//s and
                                return app_dispatch($self, $input);
-                       return quit($self, 400) if length($$rbuf) > 2;
+                       return quit($self, 400) if bytes::length($$rbuf) > 2;
                }
                if ($len == CHUNK_END) {
                        if ($$rbuf =~ s/\A\r\n//s) {
                                $len = CHUNK_START;
-                       } elsif (length($$rbuf) > 2) {
+                       } elsif (bytes::length($$rbuf) > 2) {
                                return quit($self, 400);
                        }
                }
@@ -427,14 +396,14 @@ sub read_input_chunked { # unlikely...
                                if (($len + -s $input) > $MAX_REQUEST_BUFFER) {
                                        return quit($self, 413);
                                }
-                       } elsif (length($$rbuf) > CHUNK_MAX_HDR) {
+                       } elsif (bytes::length($$rbuf) > CHUNK_MAX_HDR) {
                                return quit($self, 400);
                        }
                        # will break from loop since $len >= 0
                }
 
                if ($len < 0) { # chunk header is trickled, read more
-                       my $off = length($$rbuf);
+                       my $off = bytes::length($$rbuf);
                        my $r = sysread($sock, $$rbuf, 8192, $off);
                        return recv_err($self, $r, $len) unless $r;
                        # (implicit) goto chunk_start if $r > 0;
@@ -444,7 +413,7 @@ sub read_input_chunked { # unlikely...
                # drain the current chunk
                until ($len <= 0) {
                        if ($$rbuf ne '') {
-                               my $w = write_in_full($input, $rbuf, $len);
+                               my $w = xwrite($input, $rbuf, $len);
                                return write_err($self, "$len chunk") if !$w;
                                $len -= $w;
                                if ($len == 0) {
@@ -470,27 +439,34 @@ sub read_input_chunked { # unlikely...
 sub quit {
        my ($self, $status) = @_;
        my $h = "HTTP/1.1 $status " . status_message($status) . "\r\n\r\n";
-       $self->write($h);
+       $self->write(\$h);
        $self->close;
 }
 
 sub close {
-       my $self = shift;
-       my $forward = $self->{forward};
-       my $env = $self->{env};
-       delete $env->{'psgix.io'} if $env; # prevent circular references
-       $self->{pull} = $self->{forward} = $self->{env} = undef;
-       if ($forward) {
+       my $self = $_[0];
+       if (my $env = delete $self->{env}) {
+               delete $env->{'psgix.io'}; # prevent circular references
+       }
+       delete $self->{pull};
+       if (my $forward = delete $self->{forward}) {
                eval { $forward->close };
                err($self, "forward ->close error: $@") if $@;
        }
-       $self->SUPER::close(@_);
+       $self->SUPER::close; # PublicInbox::DS::close
 }
 
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy () {
        my ($self) = @_;
-       ($self->{rbuf} ne '' || $self->{env} || scalar(@{$self->{wbuf}}));
+       ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
 }
 
+# fires after pending writes are complete:
+sub restart_pass ($) {
+       $_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async
+}
+
+sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) }
+
 1;
index 604627ab0b5678dfa99f43bfccc88af0b0c730b9..b46baeb2804f44a88244b35b7211fabda72da16a 100644 (file)
@@ -11,6 +11,7 @@ use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
 require PublicInbox::EvCleanup;
+use Errno qw(EAGAIN);
 
 sub new {
        my ($class, $io, $cb, $cleanup) = @_;
@@ -25,18 +26,13 @@ sub new {
 
        my $self = fields::new($class);
        IO::Handle::blocking($io, 0);
-       $self->SUPER::new($io);
+       $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
        $self->{cb} = $cb;
        $self->{cleanup} = $cleanup;
-       $self->watch_read(1);
        $self;
 }
 
-# fires after pending writes are complete:
-sub restart_read_cb ($) {
-       my ($self) = @_;
-       sub { $self->watch_read(1) }
-}
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
 
 sub main_cb ($$$) {
        my ($http, $fh, $bref) = @_;
@@ -44,24 +40,31 @@ sub main_cb ($$$) {
                my ($self) = @_;
                my $r = sysread($self->{sock}, $$bref, 8192);
                if ($r) {
-                       $fh->write($$bref);
-                       unless ($http->{closed}) { # PublicInbox::DS sets this
-                               if (scalar @{$http->{wbuf}}) {
-                                       $self->watch_read(0);
-                                       $http->write(restart_read_cb($self));
+                       $fh->write($$bref); # may call $http->close
+
+                       if ($http->{sock}) { # !closed
+                               if ($http->{wbuf}) {
+                                       # HTTP client could not keep up, so
+                                       # stop reading and buffering.
+                                       $self->watch(0);
+
+                                       # Tell the HTTP socket to restart us
+                                       # when HTTP client is done draining
+                                       # $http->{wbuf}:
+                                       $http->enqueue_restart_pass;
                                }
-                               # stay in watch_read, but let other clients
+                               # stay in EPOLLIN, but let other clients
                                # get some work done, too.
                                return;
                        }
                        # fall through to close below...
                } elsif (!defined $r) {
-                       return if $!{EAGAIN} || $!{EINTR};
+                       return restart_read($self) if $! == EAGAIN;
                }
 
                # Done! Error handling will happen in $fh->close
                # called by the {cleanup} handler
-               $http->{forward} = undef;
+               delete $http->{forward};
                $self->close;
        }
 }
@@ -78,13 +81,14 @@ sub async_pass {
 sub event_step { $_[0]->{cb}->(@_) }
 
 sub close {
-       my $self = shift;
-       my $cleanup = $self->{cleanup};
-       $self->{cleanup} = $self->{cb} = undef;
-       $self->SUPER::close(@_);
+       my $self = $_[0];
+       delete $self->{cb};
+       $self->SUPER::close;
 
        # we defer this to the next timer loop since close is deferred
-       PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+       if (my $cleanup = delete $self->{cleanup}) {
+               PublicInbox::EvCleanup::next_tick($cleanup);
+       }
 }
 
 1;
index 6ee3abb1e27bed242d84ca069e827025e8370e15..94b2aed4db632c597eac684057e26264f80859eb 100644 (file)
@@ -17,8 +17,8 @@ sub new ($$$) {
        listen($s, 1024);
        IO::Handle::blocking($s, 0);
        my $self = fields::new($class);
-       $self->SUPER::new($s, 1); # calls epoll_create for the first socket
-       $self->watch_read(1);
+       $self->SUPER::new($s, PublicInbox::DS::EPOLLIN()|
+                             PublicInbox::DS::EPOLLEXCLUSIVE());
        $self->{post_accept} = $cb;
        $self
 }
index 796ac74d87aa7236e0dfe300124baad77f2a7d78..53e18281ce412d32d1eb120ccf990b6163d8c625 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2015-2019 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # Each instance of this represents a NNTP client socket
@@ -6,7 +6,7 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
 use PublicInbox::MID qw(mid_escape);
@@ -14,7 +14,7 @@ use PublicInbox::Git;
 require PublicInbox::EvCleanup;
 use Email::Simple;
 use POSIX qw(strftime);
-use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+PublicInbox::DS->import(qw(now msg_more));
 use Digest::SHA qw(sha1_hex);
 use Time::Local qw(timegm timelocal);
 use constant {
@@ -24,8 +24,8 @@ use constant {
        r225 => '225 Headers follow (multi-line)',
        r430 => '430 No article with that message-id',
 };
-
-sub now () { clock_gettime(CLOCK_MONOTONIC) };
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+use Errno qw(EAGAIN);
 
 my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
 my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
@@ -45,30 +45,18 @@ sub next_tick () {
        $nextt = undef;
        my $q = $nextq;
        $nextq = [];
-       foreach my $nntp (@$q) {
-               # for request && response protocols, always finish writing
-               # before finishing reading:
-               if (my $long_cb = $nntp->{long_res}) {
-                       $nntp->write($long_cb);
-               } else {
-                       # pipelined request, we bypassed socket-readiness
-                       # checks to get here:
-                       event_step($nntp);
-
-                       # maybe there's more pipelined data, or we'll have
-                       # to register it for socket-readiness notifications
-                       if (!$nntp->{long_res} && !$nntp->{closed}) {
-                               check_read($nntp);
-                       }
-               }
-       }
+       event_step($_) for @$q;
+}
+
+sub requeue ($) {
+       push @$nextq, $_[0];
+       $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
 }
 
 sub update_idle_time ($) {
        my ($self) = @_;
-        my $sock = $self->{sock} or return;
-       my $fd = fileno($sock);
-       defined $fd and $EXPMAP->{$fd} = [ now(), $self ];
+       my $sock = $self->{sock} or return;
+       $EXPMAP->{fileno($sock)} = [ now(), $self ];
 }
 
 sub expire_old () {
@@ -76,11 +64,17 @@ sub expire_old () {
        my $exp = $EXPTIME;
        my $old = $now - $exp;
        my $nr = 0;
+       my $closed = 0;
        my %new;
        while (my ($fd, $v) = each %$EXPMAP) {
                my ($idle_time, $nntp) = @$v;
                if ($idle_time < $old) {
-                       $nntp->close; # idempotent
+                       if ($nntp->shutdn) {
+                               $closed++;
+                       } else {
+                               ++$nr;
+                               $new{$fd} = $v;
+                       }
                } else {
                        ++$nr;
                        $new{$fd} = $v;
@@ -93,18 +87,28 @@ sub expire_old () {
                $expt = undef;
                # noop to kick outselves out of the loop ASAP so descriptors
                # really get closed
-               PublicInbox::EvCleanup::asap(sub {});
+               PublicInbox::EvCleanup::asap(sub {}) if $closed;
        }
 }
 
+sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
+
 sub new ($$$) {
        my ($class, $sock, $nntpd) = @_;
        my $self = fields::new($class);
-       $self->SUPER::new($sock);
+       my $ev = EPOLLIN;
+       my $wbuf;
+       if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+               $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
+               $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
+       }
+       $self->SUPER::new($sock, $ev | EPOLLONESHOT);
        $self->{nntpd} = $nntpd;
-       res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
-       $self->{rbuf} = '';
-       $self->watch_read(1);
+       if ($wbuf) {
+               $self->{wbuf} = $wbuf;
+       } else {
+               greet($self);
+       }
        update_idle_time($self);
        $expt ||= PublicInbox::EvCleanup::later(*expire_old);
        $self;
@@ -134,7 +138,7 @@ sub process_line ($$) {
 
        my $res = eval { $req->($self, @args) };
        my $err = $@;
-       if ($err && !$self->{closed}) {
+       if ($err && $self->{sock}) {
                local $/ = "\n";
                chomp($l);
                err($self, 'error from: %s (%s)', $l, $err);
@@ -162,12 +166,12 @@ sub cmd_xgtitle ($;$) {
 
 sub list_overview_fmt ($) {
        my ($self) = @_;
-       do_more($self, $OVERVIEW_FMT);
+       msg_more($self, $OVERVIEW_FMT);
 }
 
 sub list_headers ($;$) {
        my ($self) = @_;
-       do_more($self, $LIST_HEADERS);
+       msg_more($self, $LIST_HEADERS);
 }
 
 sub list_active ($;$) {
@@ -251,7 +255,7 @@ sub parse_time ($$;$) {
        }
        my @now = $gmt ? gmtime : localtime;
        my ($YYYY, $MM, $DD);
-       if (length($date) == 8) { # RFC 3977 allows YYYYMMDD
+       if (bytes::length($date) == 8) { # RFC 3977 allows YYYYMMDD
                ($YYYY, $MM, $DD) = unpack('A4A2A2', $date);
        } else { # legacy clients send YYMMDD
                ($YYYY, $MM, $DD) = unpack('A2A2A2', $date);
@@ -403,7 +407,7 @@ sub cmd_post ($) {
 sub cmd_quit ($) {
        my ($self) = @_;
        res($self, '205 closing connection - goodbye!');
-       $self->close;
+       $self->shutdn;
        undef;
 }
 
@@ -522,8 +526,8 @@ sub simple_body_write ($$) {
        $s->body_set('');
        $body =~ s/^\./../smg;
        $body =~ s/(?<!\r)\n/\r\n/sg;
-       do_more($self, $body);
-       do_more($self, "\r\n") unless $body =~ /\r\n\z/s;
+       msg_more($self, $body);
+       msg_more($self, "\r\n") unless $body =~ /\r\n\z/s;
        '.'
 }
 
@@ -553,8 +557,8 @@ sub cmd_article ($;$) {
        my ($n, $mid, $s) = @$r;
        set_art($self, $art);
        more($self, "220 $n <$mid> article retrieved - head and body follow");
-       do_more($self, _header($s));
-       do_more($self, "\r\n");
+       msg_more($self, _header($s));
+       msg_more($self, "\r\n");
        simple_body_write($self, $s);
 }
 
@@ -565,7 +569,7 @@ sub cmd_head ($;$) {
        my ($n, $mid, $s) = @$r;
        set_art($self, $art);
        more($self, "221 $n <$mid> article retrieved - head follows");
-       do_more($self, _header($s));
+       msg_more($self, _header($s));
        '.'
 }
 
@@ -620,48 +624,46 @@ sub get_range ($$) {
 }
 
 sub long_response ($$) {
-       my ($self, $cb) = @_;
-       die "BUG: nested long response" if $self->{long_res};
+       my ($self, $cb) = @_; # cb returns true if more, false if done
 
        my $fd = fileno($self->{sock});
        defined $fd or return;
        # make sure we disable reading during a long response,
        # clients should not be sending us stuff and making us do more
        # work while we are stream a response to them
-       $self->watch_read(0);
        my $t0 = now();
-       $self->{long_res} = sub {
+       my $long_cb; # DANGER: self-referential
+       $long_cb = sub {
+               # wbuf is unset or empty, here; $cb may add to it
                my $more = eval { $cb->() };
-               if ($@ || $self->{closed}) {
-                       $self->{long_res} = undef;
-
+               if ($@ || !$self->{sock}) { # something bad happened...
+                       $long_cb = undef;
+                       my $diff = now() - $t0;
                        if ($@) {
                                err($self,
                                    "%s during long response[$fd] - %0.6f",
-                                   $@, now() - $t0);
-                       }
-                       if ($self->{closed}) {
-                               out($self, " deferred[$fd] aborted - %0.6f",
-                                          now() - $t0);
-                       } else {
-                               update_idle_time($self);
-                               check_read($self);
+                                   $@, $diff);
                        }
-               } elsif ($more) { # scalar @{$self->{wbuf}}:
+                       out($self, " deferred[$fd] aborted - %0.6f", $diff);
+                       $self->close;
+               } elsif ($more) { # $self->{wbuf}:
+                       update_idle_time($self);
+
                        # no recursion, schedule another call ASAP
                        # but only after all pending writes are done
-                       update_idle_time($self);
+                       my $wbuf = $self->{wbuf} ||= [];
+                       push @$wbuf, $long_cb;
 
-                       push @$nextq, $self;
-                       $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+                       # wbuf may be populated by $cb, no need to rearm if so:
+                       requeue($self) if scalar(@$wbuf) == 1;
                } else { # all done!
-                       $self->{long_res} = undef;
-                       check_read($self);
+                       $long_cb = undef;
                        res($self, '.');
                        out($self, " deferred[$fd] done - %0.6f", now() - $t0);
+                       requeue($self) unless $self->{wbuf};
                }
        };
-       $self->{long_res}->(); # kick off!
+       $self->write($long_cb); # kick off!
        undef;
 }
 
@@ -765,7 +767,7 @@ sub hdr_searchmsg ($$$$) {
                                $tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
                        }
                        utf8::encode($tmp);
-                       do_more($self, $tmp);
+                       msg_more($self, $tmp);
                        $cur = $msgs->[-1]->{num} + 1;
                });
        }
@@ -904,6 +906,19 @@ sub cmd_xover ($;$) {
        });
 }
 
+sub cmd_starttls ($) {
+       my ($self) = @_;
+       my $sock = $self->{sock} or return;
+       # RFC 4642 2.2.1
+       (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+       my $opt = $self->{nntpd}->{accept_tls} or
+               return '580 can not initiate TLS negotiation';
+       res($self, '382 Continue with TLS negotiation');
+       $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
+       requeue($self) if PublicInbox::DS::accept_tls_step($self);
+       undef;
+}
+
 sub cmd_xpath ($$) {
        my ($self, $mid) = @_;
        return r501 unless $mid =~ /\A<(.+)>\z/;
@@ -917,24 +932,14 @@ sub cmd_xpath ($$) {
        '223 '.join(' ', @paths);
 }
 
-sub res ($$) {
-       my ($self, $line) = @_;
-       do_write($self, $line . "\r\n");
-}
+sub res ($$) { do_write($_[0], $_[1] . "\r\n") }
 
-sub more ($$) {
-       my ($self, $line) = @_;
-       do_more($self, $line . "\r\n");
-}
+sub more ($$) { msg_more($_[0], $_[1] . "\r\n") }
 
 sub do_write ($$) {
-       my ($self, $data) = @_;
-       my $done = $self->write($data);
-       return 0 if $self->{closed};
-
-       # Do not watch for readability if we have data in the queue,
-       # instead re-enable watching for readability when we can
-       $self->watch_read(0) if (!$done || $self->{long_res});
+       my $self = $_[0];
+       my $done = $self->write(\($_[1]));
+       return 0 unless $self->{sock};
 
        $done;
 }
@@ -949,88 +954,53 @@ sub out ($$;@) {
        printf { $self->{nntpd}->{out} } $fmt."\n", @args;
 }
 
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-
-sub do_more ($$) {
-       my ($self, $data) = @_;
-       if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
-               my $n = send($self->{sock}, $data, MSG_MORE);
-               if (defined $n) {
-                       my $dlen = length($data);
-                       return 1 if $n == $dlen; # all done!
-                       $data = substr($data, $n, $dlen - $n);
-               }
-       }
-       do_write($self, $data);
-}
-
+# callback used by PublicInbox::DS for any (e)poll (in/out/hup/err)
 sub event_step {
        my ($self) = @_;
-       return if $self->{closed};
 
-       my $wbuf = $self->{wbuf};
-       if (@$wbuf) {
-               update_idle_time($self);
-               $self->write(undef);
-               return if $self->{closed} || scalar(@$wbuf);
-       }
-       return if $self->{long_res};
+       return unless $self->flush_write && $self->{sock};
+
+       update_idle_time($self);
        # only read more requests if we've drained the write buffer,
        # otherwise we can be buffering infinitely w/o backpressure
 
        use constant LINE_MAX => 512; # RFC 977 section 2.3
-       my $rbuf = \($self->{rbuf});
-       my $r;
+       my $rbuf = $self->{rbuf} // (\(my $x = ''));
+       my $r = 1;
 
        if (index($$rbuf, "\n") < 0) {
-               my $off = length($$rbuf);
-               $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
-               unless (defined $r) {
-                       return if $!{EAGAIN};
-                       return $self->close;
-               }
-               return $self->close if $r == 0;
+               my $off = bytes::length($$rbuf);
+               $r = $self->do_read($rbuf, LINE_MAX, $off) or return;
        }
-       $r = 1;
        while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
                my $line = $1;
                return $self->close if $line =~ /[[:cntrl:]]/s;
                my $t0 = now();
                my $fd = fileno($self->{sock});
                $r = eval { process_line($self, $line) };
-               my $d = $self->{long_res} ?
-                       " deferred[$fd]" : '';
-               out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+               my $pending = $self->{wbuf} ? ' pending' : '';
+               out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
        }
 
        return $self->close if $r < 0;
-       my $len = length($$rbuf);
+       my $len = bytes::length($$rbuf);
        return $self->close if ($len >= LINE_MAX);
-       update_idle_time($self);
-}
-
-sub check_read {
-       my ($self) = @_;
-       if (index($self->{rbuf}, "\n") >= 0) {
-               # Force another read if there is a pipelined request.
-               # We don't know if the socket has anything for us to read,
-               # and we must double-check again by the time the timer fires
-               # in case we really did dispatch a read event and started
-               # another long response.
-               push @$nextq, $self;
-               $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+       if ($len) {
+               $self->{rbuf} = $rbuf;
        } else {
-               # no pipelined requests available, let the kernel know
-               # to wake us up if there's more
-               $self->watch_read(1); # PublicInbox::DS::watch_read
+               delete $self->{rbuf};
        }
+       update_idle_time($self);
+
+       # maybe there's more pipelined data, or we'll have
+       # to register it for socket-readiness notifications
+       requeue($self) unless $self->{wbuf};
 }
 
 sub not_idle_long ($$) {
        my ($self, $now) = @_;
-        my $sock = $self->{sock} or return;
-       defined(my $fd = fileno($sock)) or return;
-       my $ary = $EXPMAP->{$fd} or return;
+       my $sock = $self->{sock} or return;
+       my $ary = $EXPMAP->{fileno($sock)} or return;
        my $exp_at = $ary->[0] + $EXPTIME;
        $exp_at > $now;
 }
@@ -1038,8 +1008,7 @@ sub not_idle_long ($$) {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy {
        my ($self, $now) = @_;
-       ($self->{rbuf} ne '' || $self->{long_res} ||
-               scalar(@{$self->{wbuf}}) || not_idle_long($self, $now));
+       ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
 }
 
 1;
index 32848d7c583e28f0a671af1bec55d071bd92d7ea..4f30c5d9aede1cdbedab3a11bf4b84f4edd8c7e5 100644 (file)
@@ -25,6 +25,8 @@ sub new {
                out => \*STDOUT,
                grouplist => [],
                servername => $name,
+               greet => \"201 $name ready - post via email\r\n",
+               # accept_tls => { SSL_server => 1, ..., SSL_reuse_ctx => ... }
        }, $class;
 }
 
index a9f05fc14a9fa86f7b4c18868b13ef4f7bba6e7c..ccc0815ea46d5400ba591d2457aba220a9b2c2f1 100644 (file)
@@ -10,9 +10,8 @@ use fields qw(cb);
 sub new ($$$) {
        my ($class, $pipe, $cb) = @_;
        my $self = fields::new($class);
-       $self->SUPER::new($pipe);
+       $self->SUPER::new($pipe, PublicInbox::DS::EPOLLIN());
        $self->{cb} = $cb;
-       $self->watch_read(1);
        $self;
 }
 
index 9aede103521b47d3684032ca29ac6ace7f0a1747..f2630a0f0109a624690edf38b849423b9b0af8dd 100644 (file)
@@ -29,6 +29,9 @@ use warnings;
 use PublicInbox::Spawn qw(popen_rd);
 require Plack::Util;
 
+# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
+use Errno qw(EAGAIN EINTR);
+
 my $def_limiter;
 
 # declares a command to spawn (but does not spawn it).
@@ -122,7 +125,7 @@ sub psgi_qx {
                eval { $qx_cb->($qx) };
                $qx = undef;
        };
-       my $rpipe;
+       my $rpipe; # comes from popen_rd
        my $async = $env->{'pi-httpd.async'};
        my $cb = sub {
                my $r = sysread($rpipe, my $buf, 8192);
@@ -131,13 +134,13 @@ sub psgi_qx {
                } elsif (defined $r) {
                        $r ? $qx->write($buf) : $end->();
                } else {
-                       return if $!{EAGAIN} || $!{EINTR}; # loop again
+                       return if $! == EAGAIN || $! == EINTR; # loop again
                        $end->();
                }
        };
        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
        $self->start($limiter, sub { # may run later, much later...
-               ($rpipe) = @_;
+               ($rpipe) = @_; # popen_rd result
                if ($async) {
                # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
                        $async = $async->($rpipe, $cb, $end);
@@ -193,7 +196,7 @@ sub psgi_return {
        my $buf = '';
        my $rd_hdr = sub {
                my $r = sysread($rpipe, $buf, 1024, length($buf));
-               return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
+               return if !defined($r) && $! == EAGAIN || $! == EINTR;
                $parse_hdr->($r, \$buf);
        };
 
index 66b916dfbc72d60c28c57cdddd63a88d1b8d7966..9161bb5b2f8dab4f1f50fe4817535d87028907c7 100644 (file)
@@ -229,8 +229,6 @@ sub popen_rd {
        my ($cmd, $env, $opts) = @_;
        pipe(my ($r, $w)) or die "pipe: $!\n";
        $opts ||= {};
-       my $blocking = $opts->{Blocking};
-       IO::Handle::blocking($r, $blocking) if defined $blocking;
        $opts->{1} = fileno($w);
        my $pid = spawn($cmd, $env, $opts);
        return unless defined $pid;
index 4ef64cc3ede3c81584fd051a5f94ed61d13f0805..500efa67016f80149f3723d08f0e11f5e8af71fa 100644 (file)
@@ -22,22 +22,24 @@ use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS $VERSION);
 $VERSION     = "0.25";
 @ISA         = qw(Exporter);
 @EXPORT_OK   = qw(sendfile epoll_ctl epoll_create epoll_wait
-                  EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND
+                  EPOLLIN EPOLLOUT
                   EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
-                  EPOLLEXCLUSIVE);
+                  EPOLLONESHOT EPOLLEXCLUSIVE);
 %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
-                             EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND
+                             EPOLLIN EPOLLOUT
                              EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
-                             EPOLLEXCLUSIVE)],
+                             EPOLLONESHOT EPOLLEXCLUSIVE)],
                 sendfile => [qw(sendfile)],
                 );
 
 use constant EPOLLIN       => 1;
 use constant EPOLLOUT      => 4;
-use constant EPOLLERR      => 8;
-use constant EPOLLHUP      => 16;
-use constant EPOLLRDBAND   => 128;
+use constant EPOLLERR      => 8;
+use constant EPOLLHUP      => 16;
+use constant EPOLLRDBAND   => 128;
 use constant EPOLLEXCLUSIVE => (1 << 28);
+use constant EPOLLONESHOT => (1 << 30);
+# use constant EPOLLET => (1 << 31);
 use constant EPOLL_CTL_ADD => 1;
 use constant EPOLL_CTL_DEL => 2;
 use constant EPOLL_CTL_MOD => 3;
@@ -57,7 +59,6 @@ sub _load_syscall {
     return $rv;
 }
 
-our ($sysname, $nodename, $release, $version, $machine) = POSIX::uname();
 
 our (
      $SYS_epoll_create,
@@ -69,6 +70,7 @@ our (
 our $no_deprecated = 0;
 
 if ($^O eq "linux") {
+    my $machine = (POSIX::uname())[-1];
     # whether the machine requires 64-bit numbers to be on 8-byte
     # boundaries.
     my $u64_mod_8 = 0;
@@ -246,13 +248,8 @@ sub sendfile_freebsd {
 
 sub epoll_defined { return $SYS_epoll_create ? 1 : 0; }
 
-# ARGS: (size) -- but in modern Linux 2.6, the
-# size doesn't even matter (radix tree now, not hash)
 sub epoll_create {
-    return -1 unless defined $SYS_epoll_create;
-    my $epfd = eval { syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0) };
-    return -1 if $@;
-    return $epfd;
+       syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0);
 }
 
 # epoll_ctl wrapper
diff --git a/lib/PublicInbox/TLS.pm b/lib/PublicInbox/TLS.pm
new file mode 100644 (file)
index 0000000..576c11d
--- /dev/null
@@ -0,0 +1,24 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# IO::Socket::SSL support code
+package PublicInbox::TLS;
+use strict;
+use IO::Socket::SSL;
+require Carp;
+use Errno qw(EAGAIN);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLOUT);
+
+sub err () { $SSL_ERROR }
+
+# returns the EPOLL event bit which matches the existing SSL error
+sub epollbit () {
+       if ($! == EAGAIN) {
+               return EPOLLIN if $SSL_ERROR == SSL_WANT_READ;
+               return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE;
+               die "unexpected SSL error: $SSL_ERROR";
+       }
+       0;
+}
+
+1;
index 484ce8d677a731858a80d7414acd0634970d3b7a..55bf330efef59b1b9699ae0e49f0bda4d2682460 100755 (executable)
@@ -11,4 +11,5 @@ require PublicInbox::NNTPD;
 my $nntpd = PublicInbox::NNTPD->new;
 PublicInbox::Daemon::run('0.0.0.0:119',
        sub { $nntpd->refresh_groups }, # refresh
-       sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }); # post_accept
+       sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }, # post_accept
+       $nntpd);
diff --git a/t/ds-poll.t b/t/ds-poll.t
new file mode 100644 (file)
index 0000000..a397ee0
--- /dev/null
@@ -0,0 +1,58 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+#  <https://www.gnu.org/licenses/gpl-1.0.txt>
+#  <https://dev.perl.org/licenses/artistic.html>
+use strict;
+use warnings;
+use Test::More;
+use PublicInbox::Syscall qw(:epoll);
+my $cls = 'PublicInbox::DSPoll';
+use_ok $cls;
+my $p = $cls->new;
+
+my ($r, $w, $x, $y);
+pipe($r, $w) or die;
+pipe($x, $y) or die;
+is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($r), EPOLLIN), 0, 'add EPOLLIN');
+my $events = [];
+my $n = epoll_wait($p, 9, 0, $events);
+is_deeply($events, [], 'no events set');
+is($n, 0, 'nothing ready, yet');
+is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($w), EPOLLOUT|EPOLLONESHOT), 0,
+       'add EPOLLOUT|EPOLLONESHOT');
+$n = epoll_wait($p, 9, -1, $events);
+is($n, 1, 'got POLLOUT event');
+is($events->[0]->[0], fileno($w), '$w ready');
+
+$n = epoll_wait($p, 9, 0, $events);
+is($n, 0, 'nothing ready after oneshot');
+is_deeply($events, [], 'no events set after oneshot');
+
+syswrite($w, '1') == 1 or die;
+for my $t (0..1) {
+       $n = epoll_wait($p, 9, $t, $events);
+       is($events->[0]->[0], fileno($r), "level-trigger POLLIN ready #$t");
+       is($n, 1, "only event ready #$t");
+}
+syswrite($y, '1') == 1 or die;
+is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($x), EPOLLIN|EPOLLONESHOT), 0,
+       'EPOLLIN|EPOLLONESHOT add');
+is(epoll_wait($p, 9, -1, $events), 2, 'epoll_wait has 2 ready');
+my @fds = sort(map { $_->[0] } @$events);
+my @exp = sort((fileno($r), fileno($x)));
+is_deeply(\@fds, \@exp, 'got both ready FDs');
+
+# EPOLL_CTL_DEL doesn't matter for kqueue, we do it in native epoll
+# to avoid a kernel-wide lock; but its not needed for native kqueue
+# paths so DSKQXS makes it a noop (as did Danga::Socket::close).
+SKIP: {
+       if ($cls ne 'PublicInbox::DSPoll') {
+               skip "$cls doesn't handle EPOLL_CTL_DEL", 2;
+       }
+       is(epoll_ctl($p, EPOLL_CTL_DEL, fileno($r), 0), 0, 'EPOLL_CTL_DEL OK');
+       $n = epoll_wait($p, 9, 0, $events);
+       is($n, 0, 'nothing ready after EPOLL_CTL_DEL');
+};
+
+done_testing;
index c1dc77dba9a3a5b89fa8420e9c66af962fa19e82..1cfc2565c09fd3c3aa7056cd50473ffef5f77ce8 100644 (file)
@@ -18,7 +18,7 @@ use File::Temp qw/tempdir/;
 use IO::Socket;
 use IO::Socket::UNIX;
 use Fcntl qw(:seek);
-use Socket qw(IPPROTO_TCP TCP_NODELAY);
+use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET);
 use POSIX qw(mkfifo);
 require './t/common.perl';
 my $tmpdir = tempdir('httpd-corner-XXXXXX', TMPDIR => 1, CLEANUP => 1);
@@ -36,6 +36,25 @@ my %opts = (
        Listen => 1024,
 );
 my $sock = IO::Socket::INET->new(%opts);
+
+# Make sure we don't clobber socket options set by systemd or similar
+# using socket activation:
+my ($defer_accept_val, $accf_arg);
+if ($^O eq 'linux') {
+       setsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 5) or die;
+       my $x = getsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT());
+       defined $x or die "getsockopt: $!";
+       $defer_accept_val = unpack('i', $x);
+       if ($defer_accept_val <= 0) {
+               die "unexpected TCP_DEFER_ACCEPT value: $defer_accept_val";
+       }
+} elsif ($^O eq 'freebsd' && system('kldstat -m accf_data >/dev/null') == 0) {
+       require PublicInbox::Daemon;
+       my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+       $accf_arg = pack('a16a240', 'dataready', '');
+       setsockopt($sock, SOL_SOCKET, $var, $accf_arg) or die "setsockopt: $!";
+}
+
 my $upath = "$tmpdir/s";
 my $unix = IO::Socket::UNIX->new(
        Listen => 1024,
@@ -89,7 +108,7 @@ my $spawn_httpd = sub {
        is(scalar(grep(/CLOSE FAIL/, @$after)), 1, 'body->close not called');
 }
 
-{
+SKIP: {
        my $conn = conn_for($sock, 'excessive header');
        $SIG{PIPE} = 'IGNORE';
        $conn->write("GET /callback HTTP/1.0\r\n");
@@ -497,6 +516,21 @@ SKIP: {
        is($body, sha1_hex(''), 'read expected body #2');
 }
 
+SKIP: {
+       skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux';
+       my $var = Socket::TCP_DEFER_ACCEPT();
+       defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die;
+       is(unpack('i', $x), $defer_accept_val,
+               'TCP_DEFER_ACCEPT unchanged if previously set');
+};
+SKIP: {
+       skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd';
+       skip 'accf_data not loaded: kldload accf_data' if !defined $accf_arg;
+       my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+       defined(my $x = getsockopt($sock, SOL_SOCKET, $var)) or die;
+       is($x, $accf_arg, 'SO_ACCEPTFILTER unchanged if previously set');
+};
+
 done_testing();
 
 sub capture {
index c061031cc662f7b63d6d9c4fffb2caebf2fdc926..e085c4b9f502b2bbdae5828ff379ddb332fcbfd9 100644 (file)
--- a/t/httpd.t
+++ b/t/httpd.t
@@ -10,6 +10,7 @@ foreach my $mod (qw(Plack::Util Plack::Builder HTTP::Date HTTP::Status)) {
 }
 use File::Temp qw/tempdir/;
 use IO::Socket::INET;
+use Socket qw(IPPROTO_TCP);
 require './t/common.perl';
 
 # FIXME: too much setup
@@ -99,6 +100,23 @@ EOF
                'fsck on cloned directory successful');
 }
 
+SKIP: {
+       skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux';
+       my $var = Socket::TCP_DEFER_ACCEPT();
+       defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die;
+       ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set');
+};
+SKIP: {
+       skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd';
+       if (system('kldstat -m accf_http >/dev/null') != 0) {
+               skip 'accf_http not loaded: kldload accf_http', 1;
+       }
+       require PublicInbox::Daemon;
+       my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+       my $x = getsockopt($sock, SOL_SOCKET, $var);
+       like($x, qr/\Ahttpready\0+\z/, 'got httpready accf for HTTP');
+};
+
 done_testing();
 
 1;
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
new file mode 100644 (file)
index 0000000..427d370
--- /dev/null
@@ -0,0 +1,224 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw(tempdir);
+use Socket qw(SOCK_STREAM IPPROTO_TCP SOL_SOCKET);
+# IO::Poll and Net::NNTP are part of the standard library, but
+# distros may split them off...
+foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) {
+       eval "require $mod";
+       plan skip_all => "$mod missing for $0" if $@;
+}
+my $cert = 'certs/server-cert.pem';
+my $key = 'certs/server-key.pem';
+unless (-r $key && -r $cert) {
+       plan skip_all =>
+               "certs/ missing for $0, run ./create-certs.perl in certs/";
+}
+
+use_ok 'PublicInbox::TLS';
+use_ok 'IO::Socket::SSL';
+require './t/common.perl';
+require PublicInbox::InboxWritable;
+require PublicInbox::MIME;
+require PublicInbox::SearchIdx;
+my $version = 2; # v2 needs newer git
+require_git('2.6') if $version >= 2;
+my $tmpdir = tempdir('pi-nntpd-tls-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $err = "$tmpdir/stderr.log";
+my $out = "$tmpdir/stdout.log";
+my $mainrepo = "$tmpdir";
+my $pi_config = "$tmpdir/pi_config";
+my $group = 'test-nntpd-tls';
+my $addr = $group . '@example.com';
+my $nntpd = 'blib/script/public-inbox-nntpd';
+my %opts = (
+       LocalAddr => '127.0.0.1',
+       ReuseAddr => 1,
+       Proto => 'tcp',
+       Type => SOCK_STREAM,
+       Listen => 1024,
+);
+my $starttls = IO::Socket::INET->new(%opts);
+my $nntps = IO::Socket::INET->new(%opts);
+my ($pid, $tail_pid);
+END {
+       foreach ($pid, $tail_pid) {
+               kill 'TERM', $_ if defined $_;
+       }
+};
+
+my $ibx = PublicInbox::Inbox->new({
+       mainrepo => $mainrepo,
+       name => 'nntpd-tls',
+       version => $version,
+       -primary_address => $addr,
+       indexlevel => 'basic',
+});
+$ibx = PublicInbox::InboxWritable->new($ibx, {nproc=>1});
+$ibx->init_inbox(0);
+{
+       open my $fh, '>', $pi_config or die "open: $!\n";
+       print $fh <<EOF
+[publicinbox "nntpd-tls"]
+       mainrepo = $mainrepo
+       address = $addr
+       indexlevel = basic
+       newsgroup = $group
+EOF
+       ;
+       close $fh or die "close: $!\n";
+}
+
+{
+       my $im = $ibx->importer(0);
+       my $mime = PublicInbox::MIME->new(do {
+               open my $fh, '<', 't/data/0001.patch' or die;
+               local $/;
+               <$fh>
+       });
+       ok($im->add($mime), 'message added');
+       $im->done;
+       if ($version == 1) {
+               my $s = PublicInbox::SearchIdx->new($ibx, 1);
+               $s->index_sync;
+       }
+}
+
+my $nntps_addr = $nntps->sockhost . ':' . $nntps->sockport;
+my $starttls_addr = $starttls->sockhost . ':' . $starttls->sockport;
+my $env = { PI_CONFIG => $pi_config };
+
+for my $args (
+       [ "--cert=$cert", "--key=$key",
+               "-lnntps://$nntps_addr",
+               "-lnntp://$starttls_addr" ],
+) {
+       for ($out, $err) {
+               open my $fh, '>', $_ or die "truncate: $!";
+       }
+       if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail
+               $tail_pid = fork;
+               if (defined $tail_pid && $tail_pid == 0) {
+                       exec(split(' ', $tail_cmd), $out, $err);
+               }
+       }
+       my $cmd = [ $nntpd, '-W0', @$args, "--stdout=$out", "--stderr=$err" ];
+       $pid = spawn_listener($env, $cmd, [ $starttls, $nntps ]);
+       my %o = (
+               SSL_hostname => 'server.local',
+               SSL_verifycn_name => 'server.local',
+               SSL_verify_mode => SSL_VERIFY_PEER(),
+               SSL_ca_file => 'certs/test-ca.pem',
+       );
+       my $expect = { $group => [qw(1 1 n)] };
+
+       # start negotiating a slow TLS connection
+       my $slow = IO::Socket::INET->new(
+               Proto => 'tcp',
+               PeerAddr => $nntps_addr,
+               Type => SOCK_STREAM,
+               Blocking => 0,
+       );
+       $slow = IO::Socket::SSL->start_SSL($slow, SSL_startHandshake => 0, %o);
+       my $slow_done = $slow->connect_SSL;
+       diag('W: connect_SSL early OK, slow client test invalid') if $slow_done;
+       my @poll = (fileno($slow), PublicInbox::TLS::epollbit());
+       # we should call connect_SSL much later...
+
+       # NNTPS
+       my $c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
+       my $list = $c->list;
+       is_deeply($list, $expect, 'NNTPS LIST works');
+       is($c->command('QUIT')->response(), Net::Cmd::CMD_OK(), 'QUIT works');
+       is(0, sysread($c, my $buf, 1), 'got EOF after QUIT');
+
+       # STARTTLS
+       $c = Net::NNTP->new($starttls_addr, %o);
+       $list = $c->list;
+       is_deeply($list, $expect, 'plain LIST works');
+       ok($c->starttls, 'STARTTLS succeeds');
+       is($c->code, 382, 'got 382 for STARTTLS');
+       $list = $c->list;
+       is_deeply($list, $expect, 'LIST works after STARTTLS');
+
+       # Net::NNTP won't let us do dumb things, but we need to test
+       # dumb things, so use Net::Cmd directly:
+       my $n = $c->command('STARTTLS')->response();
+       is($n, Net::Cmd::CMD_ERROR(), 'error attempting STARTTLS again');
+       is($c->code, 502, '502 according to RFC 4642 sec#2.2.1');
+
+       # STARTTLS with bad hostname
+       $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.invalid';
+       $c = Net::NNTP->new($starttls_addr, %o);
+       $list = $c->list;
+       is_deeply($list, $expect, 'plain LIST works again');
+       ok(!$c->starttls, 'STARTTLS fails with bad hostname');
+       $c = Net::NNTP->new($starttls_addr, %o);
+       $list = $c->list;
+       is_deeply($list, $expect, 'not broken after bad negotiation');
+
+       # NNTPS with bad hostname
+       $c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
+       is($c, undef, 'NNTPS fails with bad hostname');
+       $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.local';
+       $c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
+       ok($c, 'NNTPS succeeds again with valid hostname');
+
+       # slow TLS connection did not block the other fast clients while
+       # connecting, finish it off:
+       until ($slow_done) {
+               IO::Poll::_poll(-1, @poll);
+               $slow_done = $slow->connect_SSL and last;
+               @poll = (fileno($slow), PublicInbox::TLS::epollbit());
+       }
+       $slow->blocking(1);
+       ok(sysread($slow, my $greet, 4096) > 0, 'slow got greeting');
+       like($greet, qr/\A201 /, 'got expected greeting');
+       is(syswrite($slow, "QUIT\r\n"), 6, 'slow wrote QUIT');
+       ok(sysread($slow, my $end, 4096) > 0, 'got EOF');
+       is(sysread($slow, my $eof, 4096), 0, 'got EOF');
+       $slow = undef;
+
+       SKIP: {
+               skip 'TCP_DEFER_ACCEPT is Linux-only', 2 if $^O ne 'linux';
+               my $var = Socket::TCP_DEFER_ACCEPT();
+               defined(my $x = getsockopt($nntps, IPPROTO_TCP, $var)) or die;
+               ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set on NNTPS');
+               defined($x = getsockopt($starttls, IPPROTO_TCP, $var)) or die;
+               is(unpack('i', $x), 0, 'TCP_DEFER_ACCEPT is 0 on plain NNTP');
+       };
+       SKIP: {
+               skip 'SO_ACCEPTFILTER is FreeBSD-only', 2 if $^O ne 'freebsd';
+               if (system('kldstat -m accf_data >/dev/null')) {
+                       skip 'accf_data not loaded? kldload accf_data', 2;
+               }
+               require PublicInbox::Daemon;
+               my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+               my $x = getsockopt($nntps, SOL_SOCKET, $var);
+               like($x, qr/\Adataready\0+\z/, 'got dataready accf for NNTPS');
+               $x = getsockopt($starttls, IPPROTO_TCP, $var);
+               is($x, undef, 'no BSD accept filter for plain NNTP');
+       };
+
+       $c = undef;
+       kill('TERM', $pid);
+       is($pid, waitpid($pid, 0), 'nntpd exited successfully');
+       is($?, 0, 'no error in exited process');
+       $pid = undef;
+       my $eout = eval {
+               open my $fh, '<', $err or die "open $err failed: $!";
+               local $/;
+               <$fh>;
+       };
+       unlike($eout, qr/wide/i, 'no Wide character warnings');
+       if (defined $tail_pid) {
+               kill 'TERM', $tail_pid;
+               waitpid($tail_pid, 0);
+               $tail_pid = undef;
+       }
+}
+done_testing();
+1;
index c37880bf1c486c2974c74021fdaed00d3b83e56b..6cba2be435089207d7ad8a33d76b23f11d417461 100644 (file)
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -106,6 +106,8 @@ EOF
        is_deeply($list, { $group => [ qw(1 1 n) ] }, 'LIST works');
        is_deeply([$n->group($group)], [ qw(0 1 1), $group ], 'GROUP works');
        is_deeply($n->listgroup($group), [1], 'listgroup OK');
+       ok(!$n->starttls, 'STARTTLS fails when unconfigured');
+       is($n->code, 580, 'got 580 code on server w/o TLS');
 
        %opts = (
                PeerAddr => $host_port,
index 884042824265b59717d4ca1bc9a6096f50f0807a..1d71b26df1f342b5c1faca7f0ae1ceeee06e53d2 100644 (file)
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -81,17 +81,6 @@ use PublicInbox::Spawn qw(which spawn popen_rd);
        isnt($?, 0, '$? set properly: '.$?);
 }
 
-{
-       my ($fh, $pid) = popen_rd([qw(sleep 60)], undef, { Blocking => 0 });
-       ok(defined $pid && $pid > 0, 'returned pid when array requested');
-       is(kill(0, $pid), 1, 'child process is running');
-       ok(!defined(sysread($fh, my $buf, 1)) && $!{EAGAIN},
-          'sysread returned quickly with EAGAIN');
-       is(kill(9, $pid), 1, 'child process killed early');
-       is(waitpid($pid, 0), $pid, 'child process reapable');
-       isnt($?, 0, '$? set properly: '.$?);
-}
-
 SKIP: {
        eval {
                require BSD::Resource;