Makefile.PL
README
TODO
+certs/.gitignore
+certs/create-certs.perl
ci/README
ci/deps.perl
ci/profiles.sh
lib/PublicInbox/Config.pm
lib/PublicInbox/ContentId.pm
lib/PublicInbox/DS.pm
+lib/PublicInbox/DSKQXS.pm
+lib/PublicInbox/DSPoll.pm
lib/PublicInbox/Daemon.pm
lib/PublicInbox/Emergency.pm
lib/PublicInbox/EvCleanup.pm
lib/PublicInbox/Spawn.pm
lib/PublicInbox/SpawnPP.pm
lib/PublicInbox/Syscall.pm
+lib/PublicInbox/TLS.pm
lib/PublicInbox/Unsubscribe.pm
lib/PublicInbox/UserContent.pm
lib/PublicInbox/V2Writable.pm
t/convert-compact.t
t/data/0001.patch
t/ds-leak.t
+t/ds-poll.t
t/edit.t
t/emergency.t
t/fail-bin/spamc
t/msgmap.t
t/msgtime.t
t/nntp.t
+t/nntpd-tls.t
t/nntpd.t
t/nulsubject.t
t/over.t
%.syntax ::
@\$(PERL) -w -I lib -c \$(subst .syntax,,\$@)
-syntax:: \$(my_syntax)
+syntax:: \$(filter-out lib/PublicInbox/DSKQXS.pm.syntax,\$(my_syntax))
dsyn :: \$(addsuffix .syntax, \$(filter \$(changed), \$(syn_files)))
--- /dev/null
+*.pem
+*.der
+*.enc
+*.p12
--- /dev/null
+#!/usr/bin/perl -w
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# from IO::Socket::SSL 2.063 / https://github.com/noxxi/p5-io-socket-ssl
+use strict;
+use warnings;
+use IO::Socket::SSL::Utils;
+use Net::SSLeay;
+
+my $dir = "./";
+my $now = time();
+my $later = 0x7fffffff; # 2038 problems on 32-bit :<
+
+Net::SSLeay::SSLeay_add_ssl_algorithms();
+my $sha256 = Net::SSLeay::EVP_get_digestbyname('sha256') or die;
+my $printfp = sub {
+ my ($w,$cert) = @_;
+ print $w.' sha256$'.unpack('H*',Net::SSLeay::X509_digest($cert, $sha256))."\n"
+};
+
+my %time_valid = (not_before => $now, not_after => $later);
+
+my @ca = CERT_create(
+ CA => 1,
+ subject => { CN => 'IO::Socket::SSL Demo CA' },
+ %time_valid,
+);
+save('test-ca.pem',PEM_cert2string($ca[0]));
+
+my @server = CERT_create(
+ CA => 0,
+ subject => { CN => 'server.local' },
+ purpose => 'server',
+ issuer => \@ca,
+ %time_valid,
+);
+save('server-cert.pem',PEM_cert2string($server[0]));
+save('server-key.pem',PEM_key2string($server[1]));
+$printfp->(server => $server[0]);
+
+@server = CERT_create(
+ CA => 0,
+ subject => { CN => 'server2.local' },
+ purpose => 'server',
+ issuer => \@ca,
+ %time_valid,
+);
+save('server2-cert.pem',PEM_cert2string($server[0]));
+save('server2-key.pem',PEM_key2string($server[1]));
+$printfp->(server2 => $server[0]);
+
+@server = CERT_create(
+ CA => 0,
+ subject => { CN => 'server-ecc.local' },
+ purpose => 'server',
+ issuer => \@ca,
+ key => KEY_create_ec(),
+ %time_valid,
+);
+save('server-ecc-cert.pem',PEM_cert2string($server[0]));
+save('server-ecc-key.pem',PEM_key2string($server[1]));
+$printfp->('server-ecc' => $server[0]);
+
+
+my @client = CERT_create(
+ CA => 0,
+ subject => { CN => 'client.local' },
+ purpose => 'client',
+ issuer => \@ca,
+ %time_valid,
+);
+save('client-cert.pem',PEM_cert2string($client[0]));
+save('client-key.pem',PEM_key2string($client[1]));
+$printfp->(client => $client[0]);
+
+my @swc = CERT_create(
+ CA => 0,
+ subject => { CN => 'server.local' },
+ purpose => 'server',
+ issuer => \@ca,
+ subjectAltNames => [
+ [ DNS => '*.server.local' ],
+ [ IP => '127.0.0.1' ],
+ [ DNS => 'www*.other.local' ],
+ [ DNS => 'smtp.mydomain.local' ],
+ [ DNS => 'xn--lwe-sna.idntest.local' ]
+ ],
+ %time_valid,
+);
+save('server-wildcard.pem',PEM_cert2string($swc[0]),PEM_key2string($swc[1]));
+
+
+my @subca = CERT_create(
+ CA => 1,
+ issuer => \@ca,
+ subject => { CN => 'IO::Socket::SSL Demo Sub CA' },
+ %time_valid,
+);
+save('test-subca.pem',PEM_cert2string($subca[0]));
+@server = CERT_create(
+ CA => 0,
+ subject => { CN => 'server.local' },
+ purpose => 'server',
+ issuer => \@subca,
+ %time_valid,
+);
+save('sub-server.pem',PEM_cert2string($server[0]).PEM_key2string($server[1]));
+
+
+
+my @cap = CERT_create(
+ CA => 1,
+ subject => { CN => 'IO::Socket::SSL::Intercept' },
+ %time_valid,
+);
+save('proxyca.pem',PEM_cert2string($cap[0]).PEM_key2string($cap[1]));
+
+sub save {
+ my $file = shift;
+ open(my $fd,'>',$dir.$file) or die $!;
+ print $fd @_;
+}
+
+system(<<CMD);
+cd $dir
+set -x
+openssl x509 -in server-cert.pem -out server-cert.der -outform der
+openssl rsa -in server-key.pem -out server-key.der -outform der
+openssl rsa -in server-key.pem -out server-key.enc -passout pass:bluebell
+openssl rsa -in client-key.pem -out client-key.enc -passout pass:opossum
+openssl pkcs12 -export -in server-cert.pem -inkey server-key.pem -out server.p12 -passout pass:
+openssl pkcs12 -export -in server-cert.pem -inkey server-key.pem -out server_enc.p12 -passout pass:bluebell
+CMD
use strict;
use bytes;
use POSIX ();
-use Time::HiRes ();
use IO::Handle qw();
-use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
-
+use Fcntl qw(SEEK_SET :DEFAULT);
+use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+use parent qw(Exporter);
+our @EXPORT_OK = qw(now msg_more);
use warnings;
+use 5.010_001;
use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
- 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write
+ 'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
- 'closed', # bool: socket is closed
- 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
);
-use Errno qw(EAGAIN EINVAL);
-use Carp qw(croak confess);
-
-use constant DebugLevel => 0;
-
-use constant POLLIN => 1;
-use constant POLLOUT => 4;
-use constant POLLERR => 8;
-use constant POLLHUP => 16;
-use constant POLLNVAL => 32;
-
-our $HAVE_KQUEUE = eval { require IO::KQueue; 1 };
+use Errno qw(EAGAIN EINVAL EEXIST);
+use Carp qw(croak confess carp);
+require File::Spec;
our (
- $HaveEpoll, # Flag -- is epoll available? initially undefined.
- $HaveKQueue,
%DescriptorMap, # fd (num) -> PublicInbox::DS object
- $Epoll, # Global epoll fd (for epoll mode only)
- $KQueue, # Global kqueue fd ref (for kqueue mode only)
+ $Epoll, # Global epoll fd (or DSKQXS ref)
$_io, # IO::Handle for Epoll
@ToClose, # sockets to close when event loop is done
@Timers, # timers
);
-# this may be set to zero with old kernels
-our $EPOLLEXCLUSIVE = EPOLLEXCLUSIVE;
Reset();
#####################################################################
$PostLoopCallback = undef;
$DoneInit = 0;
- # NOTE kqueue is close-on-fork, and we don't account for it, yet
- # OTOH, we (public-inbox) don't need this sub outside of tests...
- POSIX::close($$KQueue) if !$_io && $KQueue && $$KQueue >= 0;
- $KQueue = undef;
-
- $_io = undef; # close $Epoll
- $Epoll = undef;
+ $_io = undef; # closes real $Epoll FD
+ $Epoll = undef; # may call DSKQXS::DESTROY
*EventLoop = *FirstTimeEventLoop;
}
return $LoopTimeout = $_[1] + 0;
}
-=head2 C<< CLASS->DebugMsg( $format, @args ) >>
-
-Print the debugging message specified by the C<sprintf>-style I<format> and
-I<args>
-
-=cut
-sub DebugMsg {
- my ( $class, $fmt, @args ) = @_;
- chomp $fmt;
- printf STDERR ">>> $fmt\n", @args;
-}
-
=head2 C<< CLASS->AddTimer( $seconds, $coderef ) >>
Add a timer to occur $seconds from now. $seconds may be fractional, but timers
=cut
sub AddTimer {
- my $class = shift;
- my ($secs, $coderef) = @_;
+ my ($class, $secs, $coderef) = @_;
- my $fire_time = Time::HiRes::time() + $secs;
+ if (!$secs) {
+ my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
+ unshift(@Timers, $timer);
+ return $timer;
+ }
+
+ my $fire_time = now() + $secs;
my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
return if $DoneInit;
$DoneInit = 1;
- if ($HAVE_KQUEUE) {
- $KQueue = IO::KQueue->new();
- $HaveKQueue = defined $KQueue;
- if ($HaveKQueue) {
- *EventLoop = *KQueueEventLoop;
- }
- }
- elsif (PublicInbox::Syscall::epoll_defined()) {
- $Epoll = eval { epoll_create(1024); };
- $HaveEpoll = defined $Epoll && $Epoll >= 0;
- if ($HaveEpoll) {
- set_cloexec($Epoll);
- *EventLoop = *EpollEventLoop;
+ if (PublicInbox::Syscall::epoll_defined()) {
+ $Epoll = epoll_create();
+ set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0);
+ } else {
+ my $cls;
+ for (qw(DSKQXS DSPoll)) {
+ $cls = "PublicInbox::$_";
+ last if eval "require $cls";
}
+ $cls->import;
+ $Epoll = $cls->new;
}
-
- if (!$HaveEpoll && !$HaveKQueue) {
- require IO::Poll;
- *EventLoop = *PollEventLoop;
- }
+ *EventLoop = *EpollEventLoop;
}
=head2 C<< CLASS->EventLoop() >>
_InitPoller();
- if ($HaveEpoll) {
- EpollEventLoop($class);
- } elsif ($HaveKQueue) {
- KQueueEventLoop($class);
- } else {
- PollEventLoop($class);
- }
+ EventLoop($class);
}
+sub now () { clock_gettime(CLOCK_MONOTONIC) }
+
# runs timers and returns milliseconds for next one, or next event loop
sub RunTimers {
return $LoopTimeout unless @Timers;
- my $now = Time::HiRes::time();
+ my $now = now();
# Run expired timers
while (@Timers && $Timers[0][0] <= $now) {
return $timeout;
}
-### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
-### okay.
sub EpollEventLoop {
- my $class = shift;
-
while (1) {
my @events;
my $i;
}
return unless PostEventLoop();
}
- exit 0;
-}
-
-### The fallback IO::Poll-based event loop. Gets installed as EventLoop if
-### IO::Epoll fails to load.
-sub PollEventLoop {
- my $class = shift;
-
- my PublicInbox::DS $pob;
-
- while (1) {
- my $timeout = RunTimers();
-
- # the following sets up @poll as a series of ($poll,$event_mask)
- # items, then uses IO::Poll::_poll, implemented in XS, which
- # modifies the array in place with the even elements being
- # replaced with the event masks that occured.
- my @poll;
- while ( my ($fd, $sock) = each %DescriptorMap ) {
- push @poll, $fd, $sock->{event_watch};
- }
-
- # if nothing to poll, either end immediately (if no timeout)
- # or just keep calling the callback
- unless (@poll) {
- select undef, undef, undef, ($timeout / 1000);
- return unless PostEventLoop();
- next;
- }
-
- my $count = IO::Poll::_poll($timeout, @poll);
- unless ($count >= 0) {
- return unless PostEventLoop();
- next;
- }
-
- # Fetch handles with read events
- while (@poll) {
- my ($fd, $state) = splice(@poll, 0, 2);
- $DescriptorMap{$fd}->event_step if $state;
- }
-
- return unless PostEventLoop();
- }
-
- exit 0;
-}
-
-### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
-### okay.
-sub KQueueEventLoop {
- my $class = shift;
-
- while (1) {
- my $timeout = RunTimers();
- my @ret = eval { $KQueue->kevent($timeout) };
- if (my $err = $@) {
- # workaround https://rt.cpan.org/Ticket/Display.html?id=116615
- if ($err =~ /Interrupted system call/) {
- @ret = ();
- } else {
- die $err;
- }
- }
-
- foreach my $kev (@ret) {
- $DescriptorMap{$kev->[0]}->event_step;
- }
- return unless PostEventLoop();
- }
-
- exit(0);
}
=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
while (my $sock = shift @ToClose) {
my $fd = fileno($sock);
- # close the socket. (not a PublicInbox::DS close)
- $sock->close;
+ # close the socket. (not a PublicInbox::DS close)
+ CORE::close($sock);
# and now we can finally remove the fd from the map. see
- # comment above in _cleanup.
+ # comment above in ->close.
delete $DescriptorMap{$fd};
}
=cut
sub new {
- my ($self, $sock, $exclusive) = @_;
+ my ($self, $sock, $ev) = @_;
$self = fields::new($self) unless ref $self;
$self->{sock} = $sock;
Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
- $self->{wbuf} = [];
- $self->{wbuf_off} = 0;
- $self->{closed} = 0;
-
- my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
-
_InitPoller();
- if ($HaveEpoll) {
- if ($exclusive) {
- $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP|$EPOLLEXCLUSIVE;
+ if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
+ if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
+ $ev &= ~EPOLLEXCLUSIVE;
+ goto retry;
}
-retry:
- if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
- if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) {
- $EPOLLEXCLUSIVE = 0; # old kernel
- $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP;
- goto retry;
- }
- die "couldn't add epoll watch for $fd: $!\n";
- }
- }
- elsif ($HaveKQueue) {
- # Add them to the queue but disabled for now
- $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
- IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
- $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
- IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
+ die "couldn't add epoll watch for $fd: $!\n";
}
-
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
if $DescriptorMap{$fd};
=cut
sub close {
- my PublicInbox::DS $self = $_[0];
- return if $self->{closed};
-
- # this does most of the work of closing us
- $self->_cleanup();
-
- # defer closing the actual socket until the event loop is done
- # processing this round of events. (otherwise we might reuse fds)
- if (my $sock = delete $self->{sock}) {
- push @ToClose, $sock;
- }
-
- return 0;
-}
-
-### METHOD: _cleanup()
-### Called by our closers so we can clean internal data structures.
-sub _cleanup {
- my PublicInbox::DS $self = $_[0];
-
- # we're effectively closed; we have no fd and sock when we leave here
- $self->{closed} = 1;
+ my ($self) = @_;
+ my $sock = delete $self->{sock} or return;
# we need to flush our write buffer, as there may
# be self-referential closures (sub { $client->close })
# preventing the object from being destroyed
- @{$self->{wbuf}} = ();
+ delete $self->{wbuf};
# if we're using epoll, we have to remove this from our epoll fd so we stop getting
# notifications about it
- if ($HaveEpoll && $self->{sock}) {
- my $fd = fileno($self->{sock});
- epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and
- confess("EPOLL_CTL_DEL: $!");
- }
+ my $fd = fileno($sock);
+ epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and
+ confess("EPOLL_CTL_DEL: $!");
# we explicitly don't delete from DescriptorMap here until we
# actually close the socket, as we might be in the middle of
# processing an epoll_wait/etc that returned hundreds of fds, one
# of which is not yet processed and is what we're closing. if we
# keep it in DescriptorMap, then the event harnesses can just
- # looked at $pob->{closed} and ignore it. but if it's an
+ # looked at $pob->{sock} == undef and ignore it. but if it's an
# un-accounted for fd, then it (understandably) freak out a bit
# and emit warnings, thinking their state got off.
+
+ # defer closing the actual socket until the event loop is done
+ # processing this round of events. (otherwise we might reuse fds)
+ push @ToClose, $sock;
+
+ return 0;
}
-=head2 C<< $obj->sock() >>
+# portable, non-thread-safe sendfile emulation (no pread, yet)
+sub psendfile ($$$) {
+ my ($sock, $fh, $off) = @_;
+
+ seek($fh, $$off, SEEK_SET) or return;
+ defined(my $to_write = read($fh, my $buf, 16384)) or return;
+ my $written = 0;
+ while ($to_write > 0) {
+ if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
+ $written += $w;
+ $to_write -= $w;
+ } else {
+ return if $written == 0;
+ last;
+ }
+ }
+ $$off += $written;
+ $written;
+}
-Returns the underlying IO::Handle for the object.
+# returns 1 if done, 0 if incomplete
+sub flush_write ($) {
+ my ($self) = @_;
+ my $wbuf = $self->{wbuf} or return 1;
+ my $sock = $self->{sock};
+
+next_buf:
+ while (my $bref = $wbuf->[0]) {
+ if (ref($bref) ne 'CODE') {
+ my $off = delete($self->{wbuf_off}) // 0;
+ while ($sock) {
+ my $w = psendfile($sock, $bref, \$off);
+ if (defined $w) {
+ if ($w == 0) {
+ shift @$wbuf;
+ goto next_buf;
+ }
+ } elsif ($! == EAGAIN) {
+ $self->{wbuf_off} = $off;
+ watch($self, EPOLLOUT|EPOLLONESHOT);
+ return 0;
+ } else {
+ return $self->close;
+ }
+ }
+ } else { #($ref eq 'CODE') {
+ shift @$wbuf;
+ my $before = scalar(@$wbuf);
+ $bref->($self);
-=cut
-sub sock {
- my PublicInbox::DS $self = shift;
- return $self->{sock};
+ # bref may be enqueueing more CODE to call (see accept_tls_step)
+ return 0 if (scalar(@$wbuf) > $before);
+ }
+ } # while @$wbuf
+
+ delete $self->{wbuf};
+ 1; # all done
+}
+
+sub do_read ($$$$) {
+ my ($self, $rbuf, $len, $off) = @_;
+ my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+ return ($r == 0 ? $self->close : $r) if defined $r;
+ # common for clients to break connections without warning,
+ # would be too noisy to log here:
+ if (ref($self) eq 'IO::Socket::SSL') {
+ my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+ watch($self, $ev | EPOLLONESHOT);
+ } elsif ($! == EAGAIN) {
+ watch($self, EPOLLIN | EPOLLONESHOT);
+ } else {
+ $self->close;
+ }
+}
+
+# drop the socket if we hit unrecoverable errors on our system which
+# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE...
+sub drop {
+ my $self = shift;
+ carp(@_);
+ $self->close;
+}
+
+# n.b.: use ->write/->read for this buffer to allow compatibility with
+# PerlIO::mmap or PerlIO::scalar if needed
+sub tmpio ($$$) {
+ my ($self, $bref, $off) = @_;
+ my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND
+ do {
+ my $fn = File::Spec->tmpdir . '/wbuf-' . rand;
+ if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely
+ unlink($fn) or return drop($self, "unlink($fn) $!");
+ } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM
+ return drop($self, "open: $!");
+ }
+ } until (defined $fh);
+ $fh->autoflush(1);
+ my $len = bytes::length($$bref) - $off;
+ $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
+ $fh
}
=head2 C<< $obj->write( $data ) >>
Write the specified data to the underlying handle. I<data> may be scalar,
-scalar ref, code ref (to run when there), or undef just to kick-start.
+scalar ref, code ref (to run when there).
Returns 1 if writes all went through, or 0 if there are writes in queue. If
it returns 1, caller should stop waiting for 'writable' events)
=cut
sub write {
- my PublicInbox::DS $self;
- my $data;
- ($self, $data) = @_;
+ my ($self, $data) = @_;
# nobody should be writing to closed sockets, but caller code can
# do two writes within an event, have the first fail and
# now-dead object does its second write. that is this case. we
# just lie and say it worked. it'll be dead soon and won't be
# hurt by this lie.
- return 1 if $self->{closed};
-
- my $bref;
-
- # just queue data if there's already a wait
- my $need_queue;
+ my $sock = $self->{sock} or return 1;
+ my $ref = ref $data;
+ my $bref = $ref ? $data : \$data;
my $wbuf = $self->{wbuf};
-
- if (defined $data) {
- $bref = ref $data ? $data : \$data;
- if (scalar @$wbuf) {
+ if ($wbuf && scalar(@$wbuf)) { # already buffering, can't write more...
+ if ($ref eq 'CODE') {
push @$wbuf, $bref;
- return 0;
- }
-
- # this flag says we're bypassing the queue system, knowing we're the
- # only outstanding write, and hoping we don't ever need to use it.
- # if so later, though, we'll need to queue
- $need_queue = 1;
- }
-
- WRITE:
- while (1) {
- return 1 unless $bref ||= $wbuf->[0];
-
- my $len;
- eval {
- $len = length($$bref); # this will die if $bref is a code ref, caught below
- };
- if ($@) {
- if (UNIVERSAL::isa($bref, "CODE")) {
- unless ($need_queue) {
- shift @$wbuf;
- }
- $bref->();
-
- # code refs are just run and never get reenqueued
- # (they're one-shot), so turn off the flag indicating the
- # outstanding data needs queueing.
- $need_queue = 0;
-
- undef $bref;
- next WRITE;
+ } else {
+ my $last = $wbuf->[-1];
+ if (ref($last) eq 'GLOB') { # append to tmp file buffer
+ $last->print($$bref) or return drop($self, "print: $!");
+ } else {
+ my $tmpio = tmpio($self, $bref, 0) or return 0;
+ push @$wbuf, $tmpio;
}
- die "Write error: $@ <$bref>";
}
-
- my $to_write = $len - $self->{wbuf_off};
- my $written = syswrite($self->{sock}, $$bref, $to_write,
- $self->{wbuf_off});
-
- if (! defined $written) {
- if ($! == EAGAIN) {
- # since connection has stuff to write, it should now be
- # interested in pending writes:
- if ($need_queue) {
- push @$wbuf, $bref;
- }
- $self->watch_write(1);
- return 0;
- }
-
+ return 0;
+ } elsif ($ref eq 'CODE') {
+ $bref->($self);
+ return 1;
+ } else {
+ my $to_write = bytes::length($$bref);
+ my $written = syswrite($sock, $$bref, $to_write);
+
+ if (defined $written) {
+ return 1 if $written == $to_write;
+ } elsif ($! == EAGAIN) {
+ $written = 0;
+ } else {
return $self->close;
- } elsif ($written != $to_write) {
- if ($need_queue) {
- push @$wbuf, $bref;
- }
- # since connection has stuff to write, it should now be
- # interested in pending writes:
- $self->{wbuf_off} += $written;
- $self->on_incomplete_write;
- return 0;
- } elsif ($written == $to_write) {
- $self->{wbuf_off} = 0;
- $self->watch_write(0);
-
- # this was our only write, so we can return immediately
- # since we avoided incrementing the buffer size or
- # putting it in the buffer. we also know there
- # can't be anything else to write.
- return 1 if $need_queue;
-
- shift @$wbuf;
- undef $bref;
- next WRITE;
}
- }
-}
-
-sub on_incomplete_write {
- my PublicInbox::DS $self = shift;
- $self->watch_write(1);
-}
-
-=head2 C<< $obj->watch_read( $boolean ) >>
-
-Turn 'readable' event notification on or off.
+ my $tmpio = tmpio($self, $bref, $written) or return 0;
-=cut
-sub watch_read {
- my PublicInbox::DS $self = shift;
- return if $self->{closed} || !$self->{sock};
-
- my $val = shift;
- my $event = $self->{event_watch};
-
- $event &= ~POLLIN if ! $val;
- $event |= POLLIN if $val;
-
- my $fd = fileno($self->{sock});
- # If it changed, set it
- if ($event != $self->{event_watch}) {
- if ($HaveKQueue) {
- $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
- $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
- }
- elsif ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
- confess("EPOLL_CTL_MOD: $!");
- }
- $self->{event_watch} = $event;
+ # wbuf may be an empty array if we're being called inside
+ # ->flush_write via CODE bref:
+ push @{$self->{wbuf} ||= []}, $tmpio;
+ watch($self, EPOLLOUT|EPOLLONESHOT);
+ return 0;
}
}
-=head2 C<< $obj->watch_write( $boolean ) >>
-
-Turn 'writable' event notification on or off.
-
-=cut
-sub watch_write {
- my PublicInbox::DS $self = shift;
- return if $self->{closed} || !$self->{sock};
-
- my $val = shift;
- my $event = $self->{event_watch};
-
- $event &= ~POLLOUT if ! $val;
- $event |= POLLOUT if $val;
- my $fd = fileno($self->{sock});
-
- # If it changed, set it
- if ($event != $self->{event_watch}) {
- if ($HaveKQueue) {
- $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
- $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
- }
- elsif ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
- confess "EPOLL_CTL_MOD: $!";
+use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
+
+sub msg_more ($$) {
+ my $self = $_[0];
+ my $sock = $self->{sock} or return 1;
+
+ if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') {
+ my $n = send($sock, $_[1], MSG_MORE);
+ if (defined $n) {
+ my $nlen = bytes::length($_[1]) - $n;
+ return 1 if $nlen == 0; # all done!
+ # queue up the unwritten substring:
+ my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
+ $self->{wbuf} = [ $tmpio ];
+ watch($self, EPOLLOUT|EPOLLONESHOT);
+ return 0;
}
- $self->{event_watch} = $event;
}
+ $self->write(\($_[1]));
}
-=head2 C<< $obj->dump_error( $message ) >>
-
-Prints to STDERR a backtrace with information about this socket and what lead
-up to the dump_error call.
+sub watch ($$) {
+ my ($self, $ev) = @_;
+ my $sock = $self->{sock} or return;
+ epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
+ confess("EPOLL_CTL_MOD $!");
+ 0;
+}
-=cut
-sub dump_error {
- my $i = 0;
- my @list;
- while (my ($file, $line, $sub) = (caller($i++))[1..3]) {
- push @list, "\t$file:$line called $sub\n";
+sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
+
+# return true if complete, false if incomplete (or failure)
+sub accept_tls_step ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ return 1 if $sock->accept_SSL;
+ return $self->close if $! != EAGAIN;
+ if (my $ev = PublicInbox::TLS::epollbit()) {
+ unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+ return watch($self, $ev | EPOLLONESHOT);
}
-
- warn "ERROR: $_[1]\n" .
- "\t$_[0] = " . $_[0]->as_string . "\n" .
- join('', @list);
+ drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
}
-=head2 C<< $obj->debugmsg( $format, @args ) >>
-
-Print the debugging message specified by the C<sprintf>-style I<format> and
-I<args>.
-
-=cut
-sub debugmsg {
- my ( $self, $fmt, @args ) = @_;
- confess "Not an object" unless ref $self;
-
- chomp $fmt;
- printf STDERR ">>> $fmt\n", @args;
+sub shutdn_tls_step ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
+ return $self->close if $! != EAGAIN;
+ if (my $ev = PublicInbox::TLS::epollbit()) {
+ unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+ return watch($self, $ev | EPOLLONESHOT);
+ }
+ drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
}
-=head2 C<< $obj->as_string() >>
-
-Returns a string describing this socket.
-
-=cut
-sub as_string {
- my PublicInbox::DS $self = shift;
- my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') .
- ($self->{event_watch} & POLLOUT ? 'W' : '') . ")";
- my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open");
- return $ret;
+# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
+# or fork w/o exec, so no inadvertant socket sharing
+sub shutdn ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ if (ref($sock) eq 'IO::Socket::SSL') {
+ shutdn_tls_step($self);
+ } else {
+ $self->close;
+ }
}
package PublicInbox::DS::Timer;
--- /dev/null
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# <https://www.gnu.org/licenses/gpl-1.0.txt>
+# <https://dev.perl.org/licenses/artistic.html>
+#
+# kqueue support via IO::KQueue XS module. This makes kqueue look
+# like epoll to simplify the code in DS.pm. This is NOT meant to be
+# an all encompassing emulation of epoll via IO::KQueue, but just to
+# support cases public-inbox-nntpd/httpd care about.
+# A pure-Perl version using syscall() is planned, and it should be
+# faster due to the lack of syscall overhead.
+package PublicInbox::DSKQXS;
+use strict;
+use warnings;
+use parent qw(IO::KQueue);
+use parent qw(Exporter);
+use IO::KQueue;
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+our @EXPORT = qw(epoll_ctl epoll_wait);
+my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec)
+
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+ my ($bit, $ev) = @_;
+ if ($ev & $bit) {
+ my $fl = EV_ADD | EV_ENABLE;
+ ($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl;
+ } else {
+ EV_ADD | EV_DISABLE;
+ }
+}
+
+sub new {
+ my ($class) = @_;
+ die 'non-singleton use not supported' if $owner_pid == $$;
+ $owner_pid = $$;
+ $class->SUPER::new;
+}
+
+sub epoll_ctl {
+ my ($self, $op, $fd, $ev) = @_;
+ if ($op != EPOLL_CTL_DEL) {
+ $self->EV_SET($fd, EVFILT_READ, kq_flag(EPOLLIN, $ev));
+ $self->EV_SET($fd, EVFILT_WRITE, kq_flag(EPOLLOUT, $ev));
+ }
+ 0;
+}
+
+sub epoll_wait {
+ my ($self, $maxevents, $timeout_msec, $events) = @_;
+ @$events = eval { $self->kevent($timeout_msec) };
+ if (my $err = $@) {
+ # workaround https://rt.cpan.org/Ticket/Display.html?id=116615
+ if ($err =~ /Interrupted system call/) {
+ @$events = ();
+ } else {
+ die $err;
+ }
+ }
+ # caller only cares for $events[$i]->[0]
+ scalar(@$events);
+}
+
+sub DESTROY {
+ my ($self) = @_;
+ if ($owner_pid == $$) {
+ POSIX::close($$self);
+ $owner_pid = -1;
+ }
+}
+
+1;
--- /dev/null
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# <https://www.gnu.org/licenses/gpl-1.0.txt>
+# <https://dev.perl.org/licenses/artistic.html>
+#
+# poll(2) via IO::Poll core module. This makes poll look
+# like epoll to simplify the code in DS.pm. This is NOT meant to be
+# an all encompassing emulation of epoll via IO::Poll, but just to
+# support cases public-inbox-nntpd/httpd care about.
+package PublicInbox::DSPoll;
+use strict;
+use warnings;
+use parent qw(Exporter);
+use IO::Poll;
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+our @EXPORT = qw(epoll_ctl epoll_wait);
+
+sub new { bless {}, $_[0] } # fd => events
+
+sub epoll_ctl {
+ my ($self, $op, $fd, $ev) = @_;
+
+ # not wasting time on error checking
+ if ($op != EPOLL_CTL_DEL) {
+ $self->{$fd} = $ev;
+ } else {
+ delete $self->{$fd};
+ }
+ 0;
+}
+
+sub epoll_wait {
+ my ($self, $maxevents, $timeout_msec, $events) = @_;
+ my @pset;
+ while (my ($fd, $events) = each %$self) {
+ my $pevents = $events & EPOLLIN ? POLLIN : 0;
+ $pevents |= $events & EPOLLOUT ? POLLOUT : 0;
+ push(@pset, $fd, $pevents);
+ }
+ @$events = ();
+ my $n = IO::Poll::_poll($timeout_msec, @pset);
+ if ($n >= 0) {
+ for (my $i = 0; $i < @pset; ) {
+ my $fd = $pset[$i++];
+ my $revents = $pset[$i++] or next;
+ delete($self->{$fd}) if $self->{$fd} & EPOLLONESHOT;
+ push @$events, [ $fd ];
+ }
+ my $nevents = scalar @$events;
+ if ($n != $nevents) {
+ warn "BUG? poll() returned $n, but got $nevents";
+ }
+ }
+ $n;
+}
+
+1;
use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
use IO::Handle;
use IO::Socket;
+use Socket qw(IPPROTO_TCP SOL_SOCKET);
+sub SO_ACCEPTFILTER () { 0x1000 }
use Cwd qw/abs_path/;
-use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
STDOUT->autoflush(1);
STDERR->autoflush(1);
-require PublicInbox::DS;
+use PublicInbox::DS qw(now);
require PublicInbox::EvCleanup;
require POSIX;
require PublicInbox::Listener;
my $worker_processes = 1;
my @listeners;
my %pids;
-my %listener_names;
+my %listener_names; # sockname => IO::Handle
+my %tls_opt; # scheme://sockname => args for IO::Socket::SSL->start_SSL
my $reexec_pid;
my $cleanup;
my ($uid, $gid);
+my ($default_cert, $default_key);
END { $cleanup->() if $cleanup };
+my %KNOWN_TLS = ( 443 => 'https', 563 => 'nntps' );
+my %KNOWN_STARTTLS = ( 119 => 'nntp' );
+
+sub accept_tls_opt ($) {
+ my ($opt_str) = @_;
+ # opt_str: opt1=val1,opt2=val2 (opt may repeat for multi-value)
+ require PublicInbox::TLS;
+ my $o = {};
+ # allow ',' as delimiter since '&' is shell-unfriendly
+ foreach (split(/[,&]/, $opt_str)) {
+ my ($k, $v) = split(/=/, $_, 2);
+ push @{$o->{$k} ||= []}, $v;
+ }
+
+ # key may be a part of cert. At least
+ # p5-io-socket-ssl/example/ssl_server.pl has this fallback:
+ $o->{cert} //= [ $default_cert ];
+ $o->{key} //= defined($default_key) ? [ $default_key ] : $o->{cert};
+ my %ctx_opt = (SSL_server => 1);
+ # parse out hostname:/path/to/ mappings:
+ foreach my $k (qw(cert key)) {
+ my $x = $ctx_opt{'SSL_'.$k.'_file'} = {};
+ foreach my $path (@{$o->{$k}}) {
+ my $host = '';
+ $path =~ s/\A([^:]+):// and $host = $1;
+ $x->{$host} = $path;
+ }
+ }
+ my $ctx = IO::Socket::SSL::SSL_Context->new(%ctx_opt) or
+ die 'SSL_Context->new: '.PublicInbox::TLS::err();
+
+ # save ~34K per idle connection (cf. SSL_CTX_set_mode(3ssl))
+ # RSS goes from 346MB to 171MB with 10K idle NNTPS clients on amd64
+ # cf. https://rt.cpan.org/Ticket/Display.html?id=129463
+ my $mode = eval { Net::SSLeay::MODE_RELEASE_BUFFERS() };
+ if ($mode && $ctx->{context}) {
+ eval { Net::SSLeay::CTX_set_mode($ctx->{context}, $mode) };
+ warn "W: $@ (setting SSL_MODE_RELEASE_BUFFERS)\n" if $@;
+ }
+
+ { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx };
+}
sub daemon_prepare ($) {
my ($default_listen) = @_;
'u|user=s' => \$user,
'g|group=s' => \$group,
'D|daemonize' => \$daemonize,
+ 'cert=s' => \$default_cert,
+ 'key=s' => \$default_key,
);
GetOptions(%opts) or die "bad command-line args\n";
die "--pid-file cannot end with '.oldbin'\n";
}
@listeners = inherit();
+
+ # allow socket-activation users to set certs once and not
+ # have to configure each socket:
+ my @inherited_names = keys(%listener_names) if defined($default_cert);
+
# ignore daemonize when inheriting
$daemonize = undef if scalar @listeners;
push @cfg_listen, $default_listen unless (@listeners || @cfg_listen);
foreach my $l (@cfg_listen) {
+ my $orig = $l;
+ my $scheme = '';
+ if ($l =~ s!\A([^:]+)://!!) {
+ $scheme = $1;
+ } elsif ($l =~ /\A(?:\[[^\]]+\]|[^:]+):([0-9])+/) {
+ my $s = $KNOWN_TLS{$1} // $KNOWN_STARTTLS{$1};
+ $scheme = $s if defined $s;
+ }
+ if ($l =~ s!/?\?(.+)\z!!) {
+ $tls_opt{"$scheme://$l"} = accept_tls_opt($1);
+ } elsif (defined($default_cert)) {
+ $tls_opt{"$scheme://$l"} = accept_tls_opt('');
+ } elsif ($scheme =~ /\A(?:nntps|https)\z/) {
+ die "$orig specified w/o cert=\n";
+ }
+ # TODO: use scheme to load either NNTP.pm or HTTP.pm
+
next if $listener_names{$l}; # already inherited
my (%o, $sock_pkg);
if (index($l, '/') == 0) {
push @listeners, $s;
}
}
+
+ # cert/key options in @cfg_listen takes precedence when inheriting,
+ # but map well-known inherited ports if --listen isn't specified
+ # at all
+ for my $sockname (@inherited_names) {
+ $sockname =~ /:([0-9]+)\z/ or next;
+ if (my $scheme = $KNOWN_TLS{$1}) {
+ $tls_opt{"$scheme://$sockname"} ||= accept_tls_opt('');
+ } elsif (($scheme = $KNOWN_STARTTLS{$1})) {
+ next if $tls_opt{"$scheme://$sockname"};
+ $tls_opt{''} ||= accept_tls_opt('');
+ }
+ }
+
die "No listeners bound\n" unless @listeners;
}
PublicInbox::DS->SetPostLoopCallback(sub {
my ($dmap, undef) = @_;
my $n = 0;
- my $now = clock_gettime(CLOCK_MONOTONIC);
+ my $now = now();
foreach my $s (values %$dmap) {
$s->can('busy') or next;
}
}
if ($n) {
- if (($warn + 5) < time) {
+ if (($warn + 5) < now()) {
warn "$$ quitting, $n client(s) left\n";
- $warn = time;
+ $warn = now();
}
unless (defined $proc_name) {
$proc_name = (split(/\s+/, $0))[0];
exit # never gets here, just for documentation
}
-sub daemon_loop ($$) {
- my ($refresh, $post_accept) = @_;
+sub tls_start_cb ($$) {
+ my ($opt, $orig_post_accept) = @_;
+ sub {
+ my ($io, $addr, $srv) = @_;
+ my $ssl = IO::Socket::SSL->start_SSL($io, %$opt);
+ $orig_post_accept->($ssl, $addr, $srv);
+ }
+}
+
+sub defer_accept ($$) {
+ my ($s, $af_name) = @_;
+ return unless defined $af_name;
+ if ($^O eq 'linux') {
+ my $x = getsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT());
+ return unless defined $x; # may be Unix socket
+ my $sec = unpack('i', $x);
+ return if $sec > 0; # systemd users may set a higher value
+ setsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 1);
+ } elsif ($^O eq 'freebsd') {
+ my $x = getsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER);
+ return if defined $x; # don't change if set
+ my $accf_arg = pack('a16a240', $af_name, '');
+ setsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER, $accf_arg);
+ }
+}
+
+sub daemon_loop ($$$$) {
+ my ($refresh, $post_accept, $nntpd, $af_default) = @_;
PublicInbox::EvCleanup::enable(); # early for $refresh
+ my %post_accept;
+ while (my ($k, $v) = each %tls_opt) {
+ if ($k =~ s!\A(?:nntps|https)://!!) {
+ $post_accept{$k} = tls_start_cb($v, $post_accept);
+ } elsif ($nntpd) { # STARTTLS, $k eq '' is OK
+ $nntpd->{accept_tls} = $v;
+ }
+ }
my $parent_pipe;
if ($worker_processes > 0) {
$refresh->(); # preload by default
$SIG{HUP} = $refresh;
$SIG{CHLD} = 'DEFAULT';
$SIG{$_} = 'IGNORE' for qw(USR2 TTIN TTOU WINCH);
- # this calls epoll_create:
- @listeners = map {
- PublicInbox::Listener->new($_, $post_accept)
+ @listeners = map {;
+ my $tls_cb = $post_accept{sockname($_)};
+
+ # NNTPS, HTTPS, HTTP, and POP3S are client-first traffic
+ # NNTP and POP3 are server-first
+ defer_accept($_, $tls_cb ? 'dataready' : $af_default);
+
+ # this calls epoll_create:
+ PublicInbox::Listener->new($_, $tls_cb || $post_accept)
} @listeners;
PublicInbox::DS->EventLoop;
$parent_pipe = undef;
}
-sub run ($$$) {
- my ($default, $refresh, $post_accept) = @_;
+sub run ($$$;$) {
+ my ($default, $refresh, $post_accept, $nntpd) = @_;
daemon_prepare($default);
+ my $af_default = $default =~ /:8080\z/ ? 'httpready' : undef;
daemonize();
- daemon_loop($refresh, $post_accept);
+ daemon_loop($refresh, $post_accept, $nntpd, $af_default);
}
sub do_chown ($) {
use strict;
use warnings;
use base qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
my $ENABLED;
sub enabled { $ENABLED }
# fires in the next event loop iteration.
pipe($r, $w) or die "pipe: $!";
fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
- $self->SUPER::new($w);
+ $self->SUPER::new($w, 0);
# always writable, since PublicInbox::EvCleanup::event_step
# never drains wbuf. We can avoid wasting a hash slot by
# stuffing the read-end of the pipe into the never-to-be-touched
# wbuf
- push @{$self->{wbuf}}, $r;
+ $self->{wbuf} = $r;
$self;
}
# ensure PublicInbox::DS::ToClose processing after timers fire
sub _asap_close () { $asapq->[1] ||= _asap_timer() }
-sub _run_asap () { _run_all($asapq) }
+# Called by PublicInbox::DS
+sub event_step { _run_all($asapq) }
+
sub _run_next () {
_run_all($nextq);
_asap_close();
_asap_close();
}
-# Called by PublicInbox::DS
-sub event_step {
- my ($self) = @_;
- $self->watch_write(0);
- _run_asap();
-}
-
sub _asap_timer () {
$singleton ||= once_init();
- $singleton->watch_write(1);
+ $singleton->watch(EPOLLOUT|EPOLLONESHOT);
1;
}
}
END {
- _run_asap();
+ event_step();
_run_all($nextq);
_run_all($laterq);
}
return;
}
last if $r == 0;
- my $off = 0;
- while ($r > 0) {
- my $w = syswrite($in, $buf, $r, $off);
- if (defined $w) {
- $r -= $w;
- $off += $w;
- } else {
- err($env, "error writing temporary file: $!");
- return;
- }
+ unless (print $in $buf) {
+ err($env, "error writing temporary file: $!");
+ return;
}
}
+ # ensure it's visible to git-http-backend(1):
+ unless ($in->flush) {
+ err($env, "error writing temporary file: $!");
+ return;
+ }
unless (defined(sysseek($in, 0, SEEK_SET))) {
err($env, "error seeking temporary file: $!");
return;
use HTTP::Date qw(time2str);
use IO::Handle;
require PublicInbox::EvCleanup;
+PublicInbox::DS->import(qw(msg_more));
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
use constant {
CHUNK_START => -1, # [a-f0-9]+\r\n
CHUNK_END => -2, # \r\n
CHUNK_ZEND => -3, # \r\n
CHUNK_MAX_HDR => 256,
};
+use Errno qw(EAGAIN);
my $pipelineq = [];
my $pipet;
$pipet = undef;
$pipelineq = [];
foreach (@$q) {
- next if $_->{closed};
+ next unless $_->{sock};
rbuf_process($_);
}
}
sub new ($$$) {
my ($class, $sock, $addr, $httpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock);
+ $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
$self->{httpd} = $httpd;
$self->{rbuf} = '';
($self->{remote_addr}, $self->{remote_port}) =
PublicInbox::Daemon::host_with_port($addr);
- $self->watch_read(1);
$self;
}
sub event_step { # called by PublicInbox::DS
my ($self) = @_;
- my $wbuf = $self->{wbuf};
- if (@$wbuf) {
- $self->write(undef);
- return if $self->{closed} || scalar(@$wbuf);
- }
+ return unless $self->flush_write && $self->{sock};
+
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
return read_input($self) if defined $self->{env};
-
- my $off = length($self->{rbuf});
- my $r = sysread($self->{sock}, $self->{rbuf}, 8192, $off);
- if (defined $r) {
- return $self->close if $r == 0;
- return rbuf_process($self);
- }
- return if $!{EAGAIN}; # no need to call watch_read(1) again
-
- # common for clients to break connections without warning,
- # would be too noisy to log here:
- return $self->close;
+ my $rbuf = \($self->{rbuf});
+ my $off = bytes::length($$rbuf);
+ $self->do_read($rbuf, 8192, $off) and rbuf_process($self);
}
sub rbuf_process {
# (they are rarely-used and git (as of 2.7.2) does not use them)
if ($r == -1 || $env{HTTP_TRAILER} ||
# this length-check is necessary for PURE_PERL=1:
- ($r == -2 && length($self->{rbuf}) > 0x4000)) {
+ ($r == -2 && bytes::length($self->{rbuf}) > 0x4000)) {
return quit($self, 400);
}
- return $self->watch_read(1) if $r < 0; # incomplete
+ return $self->watch_in1 if $r < 0; # incomplete
$self->{rbuf} = substr($self->{rbuf}, $r);
my $len = input_prepare($self, \%env);
$len ? read_input($self) : app_dispatch($self);
}
+# IO::Handle::write returns boolean, this returns bytes written:
+sub xwrite ($$$) {
+ my ($fh, $rbuf, $max) = @_;
+ my $w = bytes::length($$rbuf);
+ $w = $max if $w > $max;
+ $fh->write($$rbuf, $w) or return;
+ $w;
+}
+
sub read_input ($) {
my ($self) = @_;
my $env = $self->{env};
# env->{CONTENT_LENGTH} (identity)
my $sock = $self->{sock};
- my $len = $self->{input_left};
- $self->{input_left} = undef;
+ my $len = delete $self->{input_left};
my $rbuf = \($self->{rbuf});
my $input = $env->{'psgi.input'};
while ($len > 0) {
if ($$rbuf ne '') {
- my $w = write_in_full($input, $rbuf, $len);
+ my $w = xwrite($input, $rbuf, $len);
return write_err($self, $len) unless $w;
$len -= $w;
die "BUG: $len < 0 (w=$w)" if $len < 0;
sub app_dispatch {
my ($self, $input) = @_;
- $self->watch_read(0);
my $env = $self->{env};
$env->{REMOTE_ADDR} = $self->{remote_addr};
$env->{REMOTE_PORT} = $self->{remote_port};
$h .= 'Date: ' . http_date() . "\r\n\r\n";
if (($len || $chunked) && $env->{REQUEST_METHOD} ne 'HEAD') {
- more($self, $h);
+ msg_more($self, $h);
} else {
- $self->write($h);
+ $self->write(\$h);
}
$alive;
}
my ($self) = @_;
sub {
return if $_[0] eq '';
- more($self, sprintf("%x\r\n", bytes::length($_[0])));
- more($self, $_[0]);
+ msg_more($self, sprintf("%x\r\n", bytes::length($_[0])));
+ msg_more($self, $_[0]);
- # use $self->write("\n\n") if you care about real-time
+ # use $self->write(\"\n\n") if you care about real-time
# streaming responses, public-inbox WWW does not.
- more($self, "\r\n");
+ msg_more($self, "\r\n");
}
}
sub next_request ($) {
my ($self) = @_;
if ($self->{rbuf} eq '') { # wait for next request
- $self->watch_read(1);
+ $self->watch_in1;
} else { # avoid recursion for pipelined requests
push @$pipelineq, $self;
$pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
sub response_done_cb ($$) {
my ($self, $alive) = @_;
sub {
- my $env = $self->{env};
- $self->{env} = undef;
- $self->write("0\r\n\r\n") if $alive == 2;
- $self->write(sub{$alive ? next_request($self) : $self->close});
+ my $env = delete $self->{env};
+ $self->write(\"0\r\n\r\n") if $alive == 2;
+ $self->write($alive ? \&next_request : \&close);
}
}
my $buf = eval { $forward->getline };
if (defined $buf) {
$write->($buf); # may close in PublicInbox::DS::write
- unless ($self->{closed}) {
+ if ($self->{sock}) {
my $next = $self->{pull};
- if (scalar @{$self->{wbuf}}) {
+ if ($self->{wbuf}) {
$self->write($next);
} else {
PublicInbox::EvCleanup::asap($next);
}
}
- $self->{forward} = $self->{pull} = undef;
+ delete @$self{qw(forward pull)};
# avoid recursion
if ($forward) {
eval { $forward->close };
}
}
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-sub more ($$) {
- my $self = $_[0];
- return if $self->{closed};
- if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
- my $n = send($self->{sock}, $_[1], MSG_MORE);
- if (defined $n) {
- my $nlen = length($_[1]) - $n;
- return 1 if $nlen == 0; # all done!
-
- # PublicInbox::DS::write queues the unwritten substring:
- return $self->write(substr($_[1], $n, $nlen));
- }
- }
- $self->write($_[1]);
+sub input_tmpfile ($) {
+ open($_[0], '+>', undef);
+ $_[0]->autoflush(1);
}
sub input_prepare {
quit($self, 413);
return;
}
- open($input, '+>', undef);
+ input_tmpfile($input);
} elsif (env_chunked($env)) {
$len = CHUNK_START;
- open($input, '+>', undef);
+ input_tmpfile($input);
} else {
$input = $null_io;
}
sub recv_err {
my ($self, $r, $len) = @_;
return $self->close if (defined $r && $r == 0);
- if ($!{EAGAIN}) {
+ if ($! == EAGAIN) {
$self->{input_left} = $len;
- return;
+ return $self->watch_in1;
}
err($self, "error reading for input: $! ($len bytes remaining)");
quit($self, 500);
}
-sub write_in_full {
- my ($fh, $rbuf, $len) = @_;
- my $rv = 0;
- my $off = 0;
- while ($len > 0) {
- my $w = syswrite($fh, $$rbuf, $len, $off);
- return ($rv ? $rv : $w) unless $w; # undef or 0
- $rv += $w;
- $off += $w;
- $len -= $w;
- }
- $rv
-}
-
sub read_input_chunked { # unlikely...
my ($self) = @_;
my $input = $self->{env}->{'psgi.input'};
my $sock = $self->{sock};
- my $len = $self->{input_left};
- $self->{input_left} = undef;
+ my $len = delete $self->{input_left};
my $rbuf = \($self->{rbuf});
while (1) { # chunk start
if ($len == CHUNK_ZEND) {
$$rbuf =~ s/\A\r\n//s and
return app_dispatch($self, $input);
- return quit($self, 400) if length($$rbuf) > 2;
+ return quit($self, 400) if bytes::length($$rbuf) > 2;
}
if ($len == CHUNK_END) {
if ($$rbuf =~ s/\A\r\n//s) {
$len = CHUNK_START;
- } elsif (length($$rbuf) > 2) {
+ } elsif (bytes::length($$rbuf) > 2) {
return quit($self, 400);
}
}
if (($len + -s $input) > $MAX_REQUEST_BUFFER) {
return quit($self, 413);
}
- } elsif (length($$rbuf) > CHUNK_MAX_HDR) {
+ } elsif (bytes::length($$rbuf) > CHUNK_MAX_HDR) {
return quit($self, 400);
}
# will break from loop since $len >= 0
}
if ($len < 0) { # chunk header is trickled, read more
- my $off = length($$rbuf);
+ my $off = bytes::length($$rbuf);
my $r = sysread($sock, $$rbuf, 8192, $off);
return recv_err($self, $r, $len) unless $r;
# (implicit) goto chunk_start if $r > 0;
# drain the current chunk
until ($len <= 0) {
if ($$rbuf ne '') {
- my $w = write_in_full($input, $rbuf, $len);
+ my $w = xwrite($input, $rbuf, $len);
return write_err($self, "$len chunk") if !$w;
$len -= $w;
if ($len == 0) {
sub quit {
my ($self, $status) = @_;
my $h = "HTTP/1.1 $status " . status_message($status) . "\r\n\r\n";
- $self->write($h);
+ $self->write(\$h);
$self->close;
}
sub close {
- my $self = shift;
- my $forward = $self->{forward};
- my $env = $self->{env};
- delete $env->{'psgix.io'} if $env; # prevent circular references
- $self->{pull} = $self->{forward} = $self->{env} = undef;
- if ($forward) {
+ my $self = $_[0];
+ if (my $env = delete $self->{env}) {
+ delete $env->{'psgix.io'}; # prevent circular references
+ }
+ delete $self->{pull};
+ if (my $forward = delete $self->{forward}) {
eval { $forward->close };
err($self, "forward ->close error: $@") if $@;
}
- $self->SUPER::close(@_);
+ $self->SUPER::close; # PublicInbox::DS::close
}
# for graceful shutdown in PublicInbox::Daemon:
sub busy () {
my ($self) = @_;
- ($self->{rbuf} ne '' || $self->{env} || scalar(@{$self->{wbuf}}));
+ ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
}
+# fires after pending writes are complete:
+sub restart_pass ($) {
+ $_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async
+}
+
+sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) }
+
1;
use base qw(PublicInbox::DS);
use fields qw(cb cleanup);
require PublicInbox::EvCleanup;
+use Errno qw(EAGAIN);
sub new {
my ($class, $io, $cb, $cleanup) = @_;
my $self = fields::new($class);
IO::Handle::blocking($io, 0);
- $self->SUPER::new($io);
+ $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
$self->{cb} = $cb;
$self->{cleanup} = $cleanup;
- $self->watch_read(1);
$self;
}
-# fires after pending writes are complete:
-sub restart_read_cb ($) {
- my ($self) = @_;
- sub { $self->watch_read(1) }
-}
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
sub main_cb ($$$) {
my ($http, $fh, $bref) = @_;
my ($self) = @_;
my $r = sysread($self->{sock}, $$bref, 8192);
if ($r) {
- $fh->write($$bref);
- unless ($http->{closed}) { # PublicInbox::DS sets this
- if (scalar @{$http->{wbuf}}) {
- $self->watch_read(0);
- $http->write(restart_read_cb($self));
+ $fh->write($$bref); # may call $http->close
+
+ if ($http->{sock}) { # !closed
+ if ($http->{wbuf}) {
+ # HTTP client could not keep up, so
+ # stop reading and buffering.
+ $self->watch(0);
+
+ # Tell the HTTP socket to restart us
+ # when HTTP client is done draining
+ # $http->{wbuf}:
+ $http->enqueue_restart_pass;
}
- # stay in watch_read, but let other clients
+ # stay in EPOLLIN, but let other clients
# get some work done, too.
return;
}
# fall through to close below...
} elsif (!defined $r) {
- return if $!{EAGAIN} || $!{EINTR};
+ return restart_read($self) if $! == EAGAIN;
}
# Done! Error handling will happen in $fh->close
# called by the {cleanup} handler
- $http->{forward} = undef;
+ delete $http->{forward};
$self->close;
}
}
sub event_step { $_[0]->{cb}->(@_) }
sub close {
- my $self = shift;
- my $cleanup = $self->{cleanup};
- $self->{cleanup} = $self->{cb} = undef;
- $self->SUPER::close(@_);
+ my $self = $_[0];
+ delete $self->{cb};
+ $self->SUPER::close;
# we defer this to the next timer loop since close is deferred
- PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+ if (my $cleanup = delete $self->{cleanup}) {
+ PublicInbox::EvCleanup::next_tick($cleanup);
+ }
}
1;
listen($s, 1024);
IO::Handle::blocking($s, 0);
my $self = fields::new($class);
- $self->SUPER::new($s, 1); # calls epoll_create for the first socket
- $self->watch_read(1);
+ $self->SUPER::new($s, PublicInbox::DS::EPOLLIN()|
+ PublicInbox::DS::EPOLLEXCLUSIVE());
$self->{post_accept} = $cb;
$self
}
-# Copyright (C) 2015-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2015-2019 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
#
# Each instance of this represents a NNTP client socket
use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
use PublicInbox::Search;
use PublicInbox::Msgmap;
use PublicInbox::MID qw(mid_escape);
require PublicInbox::EvCleanup;
use Email::Simple;
use POSIX qw(strftime);
-use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+PublicInbox::DS->import(qw(now msg_more));
use Digest::SHA qw(sha1_hex);
use Time::Local qw(timegm timelocal);
use constant {
r225 => '225 Headers follow (multi-line)',
r430 => '430 No article with that message-id',
};
-
-sub now () { clock_gettime(CLOCK_MONOTONIC) };
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+use Errno qw(EAGAIN);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
$nextt = undef;
my $q = $nextq;
$nextq = [];
- foreach my $nntp (@$q) {
- # for request && response protocols, always finish writing
- # before finishing reading:
- if (my $long_cb = $nntp->{long_res}) {
- $nntp->write($long_cb);
- } else {
- # pipelined request, we bypassed socket-readiness
- # checks to get here:
- event_step($nntp);
-
- # maybe there's more pipelined data, or we'll have
- # to register it for socket-readiness notifications
- if (!$nntp->{long_res} && !$nntp->{closed}) {
- check_read($nntp);
- }
- }
- }
+ event_step($_) for @$q;
+}
+
+sub requeue ($) {
+ push @$nextq, $_[0];
+ $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
}
sub update_idle_time ($) {
my ($self) = @_;
- my $sock = $self->{sock} or return;
- my $fd = fileno($sock);
- defined $fd and $EXPMAP->{$fd} = [ now(), $self ];
+ my $sock = $self->{sock} or return;
+ $EXPMAP->{fileno($sock)} = [ now(), $self ];
}
sub expire_old () {
my $exp = $EXPTIME;
my $old = $now - $exp;
my $nr = 0;
+ my $closed = 0;
my %new;
while (my ($fd, $v) = each %$EXPMAP) {
my ($idle_time, $nntp) = @$v;
if ($idle_time < $old) {
- $nntp->close; # idempotent
+ if ($nntp->shutdn) {
+ $closed++;
+ } else {
+ ++$nr;
+ $new{$fd} = $v;
+ }
} else {
++$nr;
$new{$fd} = $v;
$expt = undef;
# noop to kick outselves out of the loop ASAP so descriptors
# really get closed
- PublicInbox::EvCleanup::asap(sub {});
+ PublicInbox::EvCleanup::asap(sub {}) if $closed;
}
}
+sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
+
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock);
+ my $ev = EPOLLIN;
+ my $wbuf;
+ if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+ $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
+ $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
+ }
+ $self->SUPER::new($sock, $ev | EPOLLONESHOT);
$self->{nntpd} = $nntpd;
- res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
- $self->{rbuf} = '';
- $self->watch_read(1);
+ if ($wbuf) {
+ $self->{wbuf} = $wbuf;
+ } else {
+ greet($self);
+ }
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
$self;
my $res = eval { $req->($self, @args) };
my $err = $@;
- if ($err && !$self->{closed}) {
+ if ($err && $self->{sock}) {
local $/ = "\n";
chomp($l);
err($self, 'error from: %s (%s)', $l, $err);
sub list_overview_fmt ($) {
my ($self) = @_;
- do_more($self, $OVERVIEW_FMT);
+ msg_more($self, $OVERVIEW_FMT);
}
sub list_headers ($;$) {
my ($self) = @_;
- do_more($self, $LIST_HEADERS);
+ msg_more($self, $LIST_HEADERS);
}
sub list_active ($;$) {
}
my @now = $gmt ? gmtime : localtime;
my ($YYYY, $MM, $DD);
- if (length($date) == 8) { # RFC 3977 allows YYYYMMDD
+ if (bytes::length($date) == 8) { # RFC 3977 allows YYYYMMDD
($YYYY, $MM, $DD) = unpack('A4A2A2', $date);
} else { # legacy clients send YYMMDD
($YYYY, $MM, $DD) = unpack('A2A2A2', $date);
sub cmd_quit ($) {
my ($self) = @_;
res($self, '205 closing connection - goodbye!');
- $self->close;
+ $self->shutdn;
undef;
}
$s->body_set('');
$body =~ s/^\./../smg;
$body =~ s/(?<!\r)\n/\r\n/sg;
- do_more($self, $body);
- do_more($self, "\r\n") unless $body =~ /\r\n\z/s;
+ msg_more($self, $body);
+ msg_more($self, "\r\n") unless $body =~ /\r\n\z/s;
'.'
}
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "220 $n <$mid> article retrieved - head and body follow");
- do_more($self, _header($s));
- do_more($self, "\r\n");
+ msg_more($self, _header($s));
+ msg_more($self, "\r\n");
simple_body_write($self, $s);
}
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "221 $n <$mid> article retrieved - head follows");
- do_more($self, _header($s));
+ msg_more($self, _header($s));
'.'
}
}
sub long_response ($$) {
- my ($self, $cb) = @_;
- die "BUG: nested long response" if $self->{long_res};
+ my ($self, $cb) = @_; # cb returns true if more, false if done
my $fd = fileno($self->{sock});
defined $fd or return;
# make sure we disable reading during a long response,
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
- $self->watch_read(0);
my $t0 = now();
- $self->{long_res} = sub {
+ my $long_cb; # DANGER: self-referential
+ $long_cb = sub {
+ # wbuf is unset or empty, here; $cb may add to it
my $more = eval { $cb->() };
- if ($@ || $self->{closed}) {
- $self->{long_res} = undef;
-
+ if ($@ || !$self->{sock}) { # something bad happened...
+ $long_cb = undef;
+ my $diff = now() - $t0;
if ($@) {
err($self,
"%s during long response[$fd] - %0.6f",
- $@, now() - $t0);
- }
- if ($self->{closed}) {
- out($self, " deferred[$fd] aborted - %0.6f",
- now() - $t0);
- } else {
- update_idle_time($self);
- check_read($self);
+ $@, $diff);
}
- } elsif ($more) { # scalar @{$self->{wbuf}}:
+ out($self, " deferred[$fd] aborted - %0.6f", $diff);
+ $self->close;
+ } elsif ($more) { # $self->{wbuf}:
+ update_idle_time($self);
+
# no recursion, schedule another call ASAP
# but only after all pending writes are done
- update_idle_time($self);
+ my $wbuf = $self->{wbuf} ||= [];
+ push @$wbuf, $long_cb;
- push @$nextq, $self;
- $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+ # wbuf may be populated by $cb, no need to rearm if so:
+ requeue($self) if scalar(@$wbuf) == 1;
} else { # all done!
- $self->{long_res} = undef;
- check_read($self);
+ $long_cb = undef;
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
+ requeue($self) unless $self->{wbuf};
}
};
- $self->{long_res}->(); # kick off!
+ $self->write($long_cb); # kick off!
undef;
}
$tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
}
utf8::encode($tmp);
- do_more($self, $tmp);
+ msg_more($self, $tmp);
$cur = $msgs->[-1]->{num} + 1;
});
}
});
}
+sub cmd_starttls ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ # RFC 4642 2.2.1
+ (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+ my $opt = $self->{nntpd}->{accept_tls} or
+ return '580 can not initiate TLS negotiation';
+ res($self, '382 Continue with TLS negotiation');
+ $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
+ requeue($self) if PublicInbox::DS::accept_tls_step($self);
+ undef;
+}
+
sub cmd_xpath ($$) {
my ($self, $mid) = @_;
return r501 unless $mid =~ /\A<(.+)>\z/;
'223 '.join(' ', @paths);
}
-sub res ($$) {
- my ($self, $line) = @_;
- do_write($self, $line . "\r\n");
-}
+sub res ($$) { do_write($_[0], $_[1] . "\r\n") }
-sub more ($$) {
- my ($self, $line) = @_;
- do_more($self, $line . "\r\n");
-}
+sub more ($$) { msg_more($_[0], $_[1] . "\r\n") }
sub do_write ($$) {
- my ($self, $data) = @_;
- my $done = $self->write($data);
- return 0 if $self->{closed};
-
- # Do not watch for readability if we have data in the queue,
- # instead re-enable watching for readability when we can
- $self->watch_read(0) if (!$done || $self->{long_res});
+ my $self = $_[0];
+ my $done = $self->write(\($_[1]));
+ return 0 unless $self->{sock};
$done;
}
printf { $self->{nntpd}->{out} } $fmt."\n", @args;
}
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-
-sub do_more ($$) {
- my ($self, $data) = @_;
- if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
- my $n = send($self->{sock}, $data, MSG_MORE);
- if (defined $n) {
- my $dlen = length($data);
- return 1 if $n == $dlen; # all done!
- $data = substr($data, $n, $dlen - $n);
- }
- }
- do_write($self, $data);
-}
-
+# callback used by PublicInbox::DS for any (e)poll (in/out/hup/err)
sub event_step {
my ($self) = @_;
- return if $self->{closed};
- my $wbuf = $self->{wbuf};
- if (@$wbuf) {
- update_idle_time($self);
- $self->write(undef);
- return if $self->{closed} || scalar(@$wbuf);
- }
- return if $self->{long_res};
+ return unless $self->flush_write && $self->{sock};
+
+ update_idle_time($self);
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
use constant LINE_MAX => 512; # RFC 977 section 2.3
- my $rbuf = \($self->{rbuf});
- my $r;
+ my $rbuf = $self->{rbuf} // (\(my $x = ''));
+ my $r = 1;
if (index($$rbuf, "\n") < 0) {
- my $off = length($$rbuf);
- $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
- unless (defined $r) {
- return if $!{EAGAIN};
- return $self->close;
- }
- return $self->close if $r == 0;
+ my $off = bytes::length($$rbuf);
+ $r = $self->do_read($rbuf, LINE_MAX, $off) or return;
}
- $r = 1;
while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
my $line = $1;
return $self->close if $line =~ /[[:cntrl:]]/s;
my $t0 = now();
my $fd = fileno($self->{sock});
$r = eval { process_line($self, $line) };
- my $d = $self->{long_res} ?
- " deferred[$fd]" : '';
- out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+ my $pending = $self->{wbuf} ? ' pending' : '';
+ out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
}
return $self->close if $r < 0;
- my $len = length($$rbuf);
+ my $len = bytes::length($$rbuf);
return $self->close if ($len >= LINE_MAX);
- update_idle_time($self);
-}
-
-sub check_read {
- my ($self) = @_;
- if (index($self->{rbuf}, "\n") >= 0) {
- # Force another read if there is a pipelined request.
- # We don't know if the socket has anything for us to read,
- # and we must double-check again by the time the timer fires
- # in case we really did dispatch a read event and started
- # another long response.
- push @$nextq, $self;
- $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+ if ($len) {
+ $self->{rbuf} = $rbuf;
} else {
- # no pipelined requests available, let the kernel know
- # to wake us up if there's more
- $self->watch_read(1); # PublicInbox::DS::watch_read
+ delete $self->{rbuf};
}
+ update_idle_time($self);
+
+ # maybe there's more pipelined data, or we'll have
+ # to register it for socket-readiness notifications
+ requeue($self) unless $self->{wbuf};
}
sub not_idle_long ($$) {
my ($self, $now) = @_;
- my $sock = $self->{sock} or return;
- defined(my $fd = fileno($sock)) or return;
- my $ary = $EXPMAP->{$fd} or return;
+ my $sock = $self->{sock} or return;
+ my $ary = $EXPMAP->{fileno($sock)} or return;
my $exp_at = $ary->[0] + $EXPTIME;
$exp_at > $now;
}
# for graceful shutdown in PublicInbox::Daemon:
sub busy {
my ($self, $now) = @_;
- ($self->{rbuf} ne '' || $self->{long_res} ||
- scalar(@{$self->{wbuf}}) || not_idle_long($self, $now));
+ ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
}
1;
out => \*STDOUT,
grouplist => [],
servername => $name,
+ greet => \"201 $name ready - post via email\r\n",
+ # accept_tls => { SSL_server => 1, ..., SSL_reuse_ctx => ... }
}, $class;
}
sub new ($$$) {
my ($class, $pipe, $cb) = @_;
my $self = fields::new($class);
- $self->SUPER::new($pipe);
+ $self->SUPER::new($pipe, PublicInbox::DS::EPOLLIN());
$self->{cb} = $cb;
- $self->watch_read(1);
$self;
}
use PublicInbox::Spawn qw(popen_rd);
require Plack::Util;
+# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
+use Errno qw(EAGAIN EINTR);
+
my $def_limiter;
# declares a command to spawn (but does not spawn it).
eval { $qx_cb->($qx) };
$qx = undef;
};
- my $rpipe;
+ my $rpipe; # comes from popen_rd
my $async = $env->{'pi-httpd.async'};
my $cb = sub {
my $r = sysread($rpipe, my $buf, 8192);
} elsif (defined $r) {
$r ? $qx->write($buf) : $end->();
} else {
- return if $!{EAGAIN} || $!{EINTR}; # loop again
+ return if $! == EAGAIN || $! == EINTR; # loop again
$end->();
}
};
$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
$self->start($limiter, sub { # may run later, much later...
- ($rpipe) = @_;
+ ($rpipe) = @_; # popen_rd result
if ($async) {
# PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
$async = $async->($rpipe, $cb, $end);
my $buf = '';
my $rd_hdr = sub {
my $r = sysread($rpipe, $buf, 1024, length($buf));
- return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
+ return if !defined($r) && $! == EAGAIN || $! == EINTR;
$parse_hdr->($r, \$buf);
};
my ($cmd, $env, $opts) = @_;
pipe(my ($r, $w)) or die "pipe: $!\n";
$opts ||= {};
- my $blocking = $opts->{Blocking};
- IO::Handle::blocking($r, $blocking) if defined $blocking;
$opts->{1} = fileno($w);
my $pid = spawn($cmd, $env, $opts);
return unless defined $pid;
$VERSION = "0.25";
@ISA = qw(Exporter);
@EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait
- EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND
+ EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
- EPOLLEXCLUSIVE);
+ EPOLLONESHOT EPOLLEXCLUSIVE);
%EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
- EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND
+ EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
- EPOLLEXCLUSIVE)],
+ EPOLLONESHOT EPOLLEXCLUSIVE)],
sendfile => [qw(sendfile)],
);
use constant EPOLLIN => 1;
use constant EPOLLOUT => 4;
-use constant EPOLLERR => 8;
-use constant EPOLLHUP => 16;
-use constant EPOLLRDBAND => 128;
+# use constant EPOLLERR => 8;
+# use constant EPOLLHUP => 16;
+# use constant EPOLLRDBAND => 128;
use constant EPOLLEXCLUSIVE => (1 << 28);
+use constant EPOLLONESHOT => (1 << 30);
+# use constant EPOLLET => (1 << 31);
use constant EPOLL_CTL_ADD => 1;
use constant EPOLL_CTL_DEL => 2;
use constant EPOLL_CTL_MOD => 3;
return $rv;
}
-our ($sysname, $nodename, $release, $version, $machine) = POSIX::uname();
our (
$SYS_epoll_create,
our $no_deprecated = 0;
if ($^O eq "linux") {
+ my $machine = (POSIX::uname())[-1];
# whether the machine requires 64-bit numbers to be on 8-byte
# boundaries.
my $u64_mod_8 = 0;
sub epoll_defined { return $SYS_epoll_create ? 1 : 0; }
-# ARGS: (size) -- but in modern Linux 2.6, the
-# size doesn't even matter (radix tree now, not hash)
sub epoll_create {
- return -1 unless defined $SYS_epoll_create;
- my $epfd = eval { syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0) };
- return -1 if $@;
- return $epfd;
+ syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0);
}
# epoll_ctl wrapper
--- /dev/null
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# IO::Socket::SSL support code
+package PublicInbox::TLS;
+use strict;
+use IO::Socket::SSL;
+require Carp;
+use Errno qw(EAGAIN);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLOUT);
+
+sub err () { $SSL_ERROR }
+
+# returns the EPOLL event bit which matches the existing SSL error
+sub epollbit () {
+ if ($! == EAGAIN) {
+ return EPOLLIN if $SSL_ERROR == SSL_WANT_READ;
+ return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE;
+ die "unexpected SSL error: $SSL_ERROR";
+ }
+ 0;
+}
+
+1;
my $nntpd = PublicInbox::NNTPD->new;
PublicInbox::Daemon::run('0.0.0.0:119',
sub { $nntpd->refresh_groups }, # refresh
- sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }); # post_accept
+ sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }, # post_accept
+ $nntpd);
--- /dev/null
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# <https://www.gnu.org/licenses/gpl-1.0.txt>
+# <https://dev.perl.org/licenses/artistic.html>
+use strict;
+use warnings;
+use Test::More;
+use PublicInbox::Syscall qw(:epoll);
+my $cls = 'PublicInbox::DSPoll';
+use_ok $cls;
+my $p = $cls->new;
+
+my ($r, $w, $x, $y);
+pipe($r, $w) or die;
+pipe($x, $y) or die;
+is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($r), EPOLLIN), 0, 'add EPOLLIN');
+my $events = [];
+my $n = epoll_wait($p, 9, 0, $events);
+is_deeply($events, [], 'no events set');
+is($n, 0, 'nothing ready, yet');
+is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($w), EPOLLOUT|EPOLLONESHOT), 0,
+ 'add EPOLLOUT|EPOLLONESHOT');
+$n = epoll_wait($p, 9, -1, $events);
+is($n, 1, 'got POLLOUT event');
+is($events->[0]->[0], fileno($w), '$w ready');
+
+$n = epoll_wait($p, 9, 0, $events);
+is($n, 0, 'nothing ready after oneshot');
+is_deeply($events, [], 'no events set after oneshot');
+
+syswrite($w, '1') == 1 or die;
+for my $t (0..1) {
+ $n = epoll_wait($p, 9, $t, $events);
+ is($events->[0]->[0], fileno($r), "level-trigger POLLIN ready #$t");
+ is($n, 1, "only event ready #$t");
+}
+syswrite($y, '1') == 1 or die;
+is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($x), EPOLLIN|EPOLLONESHOT), 0,
+ 'EPOLLIN|EPOLLONESHOT add');
+is(epoll_wait($p, 9, -1, $events), 2, 'epoll_wait has 2 ready');
+my @fds = sort(map { $_->[0] } @$events);
+my @exp = sort((fileno($r), fileno($x)));
+is_deeply(\@fds, \@exp, 'got both ready FDs');
+
+# EPOLL_CTL_DEL doesn't matter for kqueue, we do it in native epoll
+# to avoid a kernel-wide lock; but its not needed for native kqueue
+# paths so DSKQXS makes it a noop (as did Danga::Socket::close).
+SKIP: {
+ if ($cls ne 'PublicInbox::DSPoll') {
+ skip "$cls doesn't handle EPOLL_CTL_DEL", 2;
+ }
+ is(epoll_ctl($p, EPOLL_CTL_DEL, fileno($r), 0), 0, 'EPOLL_CTL_DEL OK');
+ $n = epoll_wait($p, 9, 0, $events);
+ is($n, 0, 'nothing ready after EPOLL_CTL_DEL');
+};
+
+done_testing;
use IO::Socket;
use IO::Socket::UNIX;
use Fcntl qw(:seek);
-use Socket qw(IPPROTO_TCP TCP_NODELAY);
+use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET);
use POSIX qw(mkfifo);
require './t/common.perl';
my $tmpdir = tempdir('httpd-corner-XXXXXX', TMPDIR => 1, CLEANUP => 1);
Listen => 1024,
);
my $sock = IO::Socket::INET->new(%opts);
+
+# Make sure we don't clobber socket options set by systemd or similar
+# using socket activation:
+my ($defer_accept_val, $accf_arg);
+if ($^O eq 'linux') {
+ setsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 5) or die;
+ my $x = getsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT());
+ defined $x or die "getsockopt: $!";
+ $defer_accept_val = unpack('i', $x);
+ if ($defer_accept_val <= 0) {
+ die "unexpected TCP_DEFER_ACCEPT value: $defer_accept_val";
+ }
+} elsif ($^O eq 'freebsd' && system('kldstat -m accf_data >/dev/null') == 0) {
+ require PublicInbox::Daemon;
+ my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+ $accf_arg = pack('a16a240', 'dataready', '');
+ setsockopt($sock, SOL_SOCKET, $var, $accf_arg) or die "setsockopt: $!";
+}
+
my $upath = "$tmpdir/s";
my $unix = IO::Socket::UNIX->new(
Listen => 1024,
is(scalar(grep(/CLOSE FAIL/, @$after)), 1, 'body->close not called');
}
-{
+SKIP: {
my $conn = conn_for($sock, 'excessive header');
$SIG{PIPE} = 'IGNORE';
$conn->write("GET /callback HTTP/1.0\r\n");
is($body, sha1_hex(''), 'read expected body #2');
}
+SKIP: {
+ skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux';
+ my $var = Socket::TCP_DEFER_ACCEPT();
+ defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die;
+ is(unpack('i', $x), $defer_accept_val,
+ 'TCP_DEFER_ACCEPT unchanged if previously set');
+};
+SKIP: {
+ skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd';
+ skip 'accf_data not loaded: kldload accf_data' if !defined $accf_arg;
+ my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+ defined(my $x = getsockopt($sock, SOL_SOCKET, $var)) or die;
+ is($x, $accf_arg, 'SO_ACCEPTFILTER unchanged if previously set');
+};
+
done_testing();
sub capture {
}
use File::Temp qw/tempdir/;
use IO::Socket::INET;
+use Socket qw(IPPROTO_TCP);
require './t/common.perl';
# FIXME: too much setup
'fsck on cloned directory successful');
}
+SKIP: {
+ skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux';
+ my $var = Socket::TCP_DEFER_ACCEPT();
+ defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die;
+ ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set');
+};
+SKIP: {
+ skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd';
+ if (system('kldstat -m accf_http >/dev/null') != 0) {
+ skip 'accf_http not loaded: kldload accf_http', 1;
+ }
+ require PublicInbox::Daemon;
+ my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+ my $x = getsockopt($sock, SOL_SOCKET, $var);
+ like($x, qr/\Ahttpready\0+\z/, 'got httpready accf for HTTP');
+};
+
done_testing();
1;
--- /dev/null
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw(tempdir);
+use Socket qw(SOCK_STREAM IPPROTO_TCP SOL_SOCKET);
+# IO::Poll and Net::NNTP are part of the standard library, but
+# distros may split them off...
+foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) {
+ eval "require $mod";
+ plan skip_all => "$mod missing for $0" if $@;
+}
+my $cert = 'certs/server-cert.pem';
+my $key = 'certs/server-key.pem';
+unless (-r $key && -r $cert) {
+ plan skip_all =>
+ "certs/ missing for $0, run ./create-certs.perl in certs/";
+}
+
+use_ok 'PublicInbox::TLS';
+use_ok 'IO::Socket::SSL';
+require './t/common.perl';
+require PublicInbox::InboxWritable;
+require PublicInbox::MIME;
+require PublicInbox::SearchIdx;
+my $version = 2; # v2 needs newer git
+require_git('2.6') if $version >= 2;
+my $tmpdir = tempdir('pi-nntpd-tls-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $err = "$tmpdir/stderr.log";
+my $out = "$tmpdir/stdout.log";
+my $mainrepo = "$tmpdir";
+my $pi_config = "$tmpdir/pi_config";
+my $group = 'test-nntpd-tls';
+my $addr = $group . '@example.com';
+my $nntpd = 'blib/script/public-inbox-nntpd';
+my %opts = (
+ LocalAddr => '127.0.0.1',
+ ReuseAddr => 1,
+ Proto => 'tcp',
+ Type => SOCK_STREAM,
+ Listen => 1024,
+);
+my $starttls = IO::Socket::INET->new(%opts);
+my $nntps = IO::Socket::INET->new(%opts);
+my ($pid, $tail_pid);
+END {
+ foreach ($pid, $tail_pid) {
+ kill 'TERM', $_ if defined $_;
+ }
+};
+
+my $ibx = PublicInbox::Inbox->new({
+ mainrepo => $mainrepo,
+ name => 'nntpd-tls',
+ version => $version,
+ -primary_address => $addr,
+ indexlevel => 'basic',
+});
+$ibx = PublicInbox::InboxWritable->new($ibx, {nproc=>1});
+$ibx->init_inbox(0);
+{
+ open my $fh, '>', $pi_config or die "open: $!\n";
+ print $fh <<EOF
+[publicinbox "nntpd-tls"]
+ mainrepo = $mainrepo
+ address = $addr
+ indexlevel = basic
+ newsgroup = $group
+EOF
+ ;
+ close $fh or die "close: $!\n";
+}
+
+{
+ my $im = $ibx->importer(0);
+ my $mime = PublicInbox::MIME->new(do {
+ open my $fh, '<', 't/data/0001.patch' or die;
+ local $/;
+ <$fh>
+ });
+ ok($im->add($mime), 'message added');
+ $im->done;
+ if ($version == 1) {
+ my $s = PublicInbox::SearchIdx->new($ibx, 1);
+ $s->index_sync;
+ }
+}
+
+my $nntps_addr = $nntps->sockhost . ':' . $nntps->sockport;
+my $starttls_addr = $starttls->sockhost . ':' . $starttls->sockport;
+my $env = { PI_CONFIG => $pi_config };
+
+for my $args (
+ [ "--cert=$cert", "--key=$key",
+ "-lnntps://$nntps_addr",
+ "-lnntp://$starttls_addr" ],
+) {
+ for ($out, $err) {
+ open my $fh, '>', $_ or die "truncate: $!";
+ }
+ if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail
+ $tail_pid = fork;
+ if (defined $tail_pid && $tail_pid == 0) {
+ exec(split(' ', $tail_cmd), $out, $err);
+ }
+ }
+ my $cmd = [ $nntpd, '-W0', @$args, "--stdout=$out", "--stderr=$err" ];
+ $pid = spawn_listener($env, $cmd, [ $starttls, $nntps ]);
+ my %o = (
+ SSL_hostname => 'server.local',
+ SSL_verifycn_name => 'server.local',
+ SSL_verify_mode => SSL_VERIFY_PEER(),
+ SSL_ca_file => 'certs/test-ca.pem',
+ );
+ my $expect = { $group => [qw(1 1 n)] };
+
+ # start negotiating a slow TLS connection
+ my $slow = IO::Socket::INET->new(
+ Proto => 'tcp',
+ PeerAddr => $nntps_addr,
+ Type => SOCK_STREAM,
+ Blocking => 0,
+ );
+ $slow = IO::Socket::SSL->start_SSL($slow, SSL_startHandshake => 0, %o);
+ my $slow_done = $slow->connect_SSL;
+ diag('W: connect_SSL early OK, slow client test invalid') if $slow_done;
+ my @poll = (fileno($slow), PublicInbox::TLS::epollbit());
+ # we should call connect_SSL much later...
+
+ # NNTPS
+ my $c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
+ my $list = $c->list;
+ is_deeply($list, $expect, 'NNTPS LIST works');
+ is($c->command('QUIT')->response(), Net::Cmd::CMD_OK(), 'QUIT works');
+ is(0, sysread($c, my $buf, 1), 'got EOF after QUIT');
+
+ # STARTTLS
+ $c = Net::NNTP->new($starttls_addr, %o);
+ $list = $c->list;
+ is_deeply($list, $expect, 'plain LIST works');
+ ok($c->starttls, 'STARTTLS succeeds');
+ is($c->code, 382, 'got 382 for STARTTLS');
+ $list = $c->list;
+ is_deeply($list, $expect, 'LIST works after STARTTLS');
+
+ # Net::NNTP won't let us do dumb things, but we need to test
+ # dumb things, so use Net::Cmd directly:
+ my $n = $c->command('STARTTLS')->response();
+ is($n, Net::Cmd::CMD_ERROR(), 'error attempting STARTTLS again');
+ is($c->code, 502, '502 according to RFC 4642 sec#2.2.1');
+
+ # STARTTLS with bad hostname
+ $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.invalid';
+ $c = Net::NNTP->new($starttls_addr, %o);
+ $list = $c->list;
+ is_deeply($list, $expect, 'plain LIST works again');
+ ok(!$c->starttls, 'STARTTLS fails with bad hostname');
+ $c = Net::NNTP->new($starttls_addr, %o);
+ $list = $c->list;
+ is_deeply($list, $expect, 'not broken after bad negotiation');
+
+ # NNTPS with bad hostname
+ $c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
+ is($c, undef, 'NNTPS fails with bad hostname');
+ $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.local';
+ $c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
+ ok($c, 'NNTPS succeeds again with valid hostname');
+
+ # slow TLS connection did not block the other fast clients while
+ # connecting, finish it off:
+ until ($slow_done) {
+ IO::Poll::_poll(-1, @poll);
+ $slow_done = $slow->connect_SSL and last;
+ @poll = (fileno($slow), PublicInbox::TLS::epollbit());
+ }
+ $slow->blocking(1);
+ ok(sysread($slow, my $greet, 4096) > 0, 'slow got greeting');
+ like($greet, qr/\A201 /, 'got expected greeting');
+ is(syswrite($slow, "QUIT\r\n"), 6, 'slow wrote QUIT');
+ ok(sysread($slow, my $end, 4096) > 0, 'got EOF');
+ is(sysread($slow, my $eof, 4096), 0, 'got EOF');
+ $slow = undef;
+
+ SKIP: {
+ skip 'TCP_DEFER_ACCEPT is Linux-only', 2 if $^O ne 'linux';
+ my $var = Socket::TCP_DEFER_ACCEPT();
+ defined(my $x = getsockopt($nntps, IPPROTO_TCP, $var)) or die;
+ ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set on NNTPS');
+ defined($x = getsockopt($starttls, IPPROTO_TCP, $var)) or die;
+ is(unpack('i', $x), 0, 'TCP_DEFER_ACCEPT is 0 on plain NNTP');
+ };
+ SKIP: {
+ skip 'SO_ACCEPTFILTER is FreeBSD-only', 2 if $^O ne 'freebsd';
+ if (system('kldstat -m accf_data >/dev/null')) {
+ skip 'accf_data not loaded? kldload accf_data', 2;
+ }
+ require PublicInbox::Daemon;
+ my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+ my $x = getsockopt($nntps, SOL_SOCKET, $var);
+ like($x, qr/\Adataready\0+\z/, 'got dataready accf for NNTPS');
+ $x = getsockopt($starttls, IPPROTO_TCP, $var);
+ is($x, undef, 'no BSD accept filter for plain NNTP');
+ };
+
+ $c = undef;
+ kill('TERM', $pid);
+ is($pid, waitpid($pid, 0), 'nntpd exited successfully');
+ is($?, 0, 'no error in exited process');
+ $pid = undef;
+ my $eout = eval {
+ open my $fh, '<', $err or die "open $err failed: $!";
+ local $/;
+ <$fh>;
+ };
+ unlike($eout, qr/wide/i, 'no Wide character warnings');
+ if (defined $tail_pid) {
+ kill 'TERM', $tail_pid;
+ waitpid($tail_pid, 0);
+ $tail_pid = undef;
+ }
+}
+done_testing();
+1;
is_deeply($list, { $group => [ qw(1 1 n) ] }, 'LIST works');
is_deeply([$n->group($group)], [ qw(0 1 1), $group ], 'GROUP works');
is_deeply($n->listgroup($group), [1], 'listgroup OK');
+ ok(!$n->starttls, 'STARTTLS fails when unconfigured');
+ is($n->code, 580, 'got 580 code on server w/o TLS');
%opts = (
PeerAddr => $host_port,
isnt($?, 0, '$? set properly: '.$?);
}
-{
- my ($fh, $pid) = popen_rd([qw(sleep 60)], undef, { Blocking => 0 });
- ok(defined $pid && $pid > 0, 'returned pid when array requested');
- is(kill(0, $pid), 1, 'child process is running');
- ok(!defined(sysread($fh, my $buf, 1)) && $!{EAGAIN},
- 'sysread returned quickly with EAGAIN');
- is(kill(9, $pid), 1, 'child process killed early');
- is(waitpid($pid, 0), $pid, 'child process reapable');
- isnt($?, 0, '$? set properly: '.$?);
-}
-
SKIP: {
eval {
require BSD::Resource;