t/html_index.t
t/httpd-corner.psgi
t/httpd-corner.t
+t/httpd-https.t
t/httpd-unix.t
t/httpd.t
t/hval.t
ExecStart = /usr/local/bin/public-inbox-httpd \
-1 /var/log/public-inbox/httpd.out.log
StandardError = syslog
+
+# NonBlocking is REQUIRED to avoid a race condition if running
+# simultaneous services
+NonBlocking = true
Sockets = public-inbox-httpd.socket
+
KillSignal = SIGQUIT
User = nobody
Group = nogroup
ExecStart = /usr/local/bin/public-inbox-nntpd \
-1 /var/log/public-inbox/nntpd.out.log
StandardError = syslog
+
+# NonBlocking is REQUIRED to avoid a race condition if running
+# simultaneous services
+NonBlocking = true
Sockets = public-inbox-nntpd.socket
+
KillSignal = SIGQUIT
User = nobody
Group = nogroup
# any PSGI server ought to work,
# but public-inbox-httpd supports socket activation like unsubscribe.milter
ExecStart = /usr/local/bin/public-inbox-httpd -W0 /etc/unsubscribe.psgi
+
+# NonBlocking is REQUIRED to avoid a race condition if running
+# simultaneous services
+NonBlocking = true
Sockets = unsubscribe-psgi.socket
+
# we need to modify the mlmmj spool
User = mlmmj
KillMode = process
use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
+ 'rbuf', # scalarref, usually undef
'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
);
use Carp qw(croak confess carp);
require File::Spec;
+my $nextq = []; # queue for next_tick
our (
%DescriptorMap, # fd (num) -> PublicInbox::DS object
$Epoll, # Global epoll fd (or DSKQXS ref)
sub AddTimer {
my ($class, $secs, $coderef) = @_;
- if (!$secs) {
- my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
- unshift(@Timers, $timer);
- return $timer;
- }
-
my $fire_time = now() + $secs;
my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
sub now () { clock_gettime(CLOCK_MONOTONIC) }
+sub next_tick () {
+ my $q = $nextq;
+ $nextq = [];
+ for (@$q) {
+ if (ref($_) eq 'CODE') {
+ $_->();
+ } else {
+ $_->event_step;
+ }
+ }
+}
+
# runs timers and returns milliseconds for next one, or next event loop
sub RunTimers {
- return $LoopTimeout unless @Timers;
+ next_tick();
+
+ return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
my $now = now();
$to_run->[1]->($now) if $to_run->[1];
}
+ # timers may enqueue into nextq:
+ return 0 if (@$nextq || @ToClose);
+
return $LoopTimeout unless @Timers;
# convert time to an even number of milliseconds, adding 1
# now we can close sockets that wanted to close during our event processing.
# (we didn't want to close them during the loop, as we didn't want fd numbers
# being reused and confused during the event loop)
- while (my $sock = shift @ToClose) {
- my $fd = fileno($sock);
-
- # close the socket. (not a PublicInbox::DS close)
- CORE::close($sock);
-
- # and now we can finally remove the fd from the map. see
- # comment above in ->close.
- delete $DescriptorMap{$fd};
- }
-
+ delete($DescriptorMap{fileno($_)}) for @ToClose;
+ @ToClose = (); # let refcounting drop everything all at once
# by default we keep running, unless a postloop callback (either per-object
# or global) cancels it
### I N S T A N C E M E T H O D S
#####################################################################
+sub requeue ($) { push @$nextq, $_[0] }
+
=head2 C<< $obj->close >>
Close the socket.
$written;
}
+sub epbit ($$) { # (sock, default)
+ ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1];
+}
+
# returns 1 if done, 0 if incomplete
sub flush_write ($) {
my ($self) = @_;
goto next_buf;
}
} elsif ($! == EAGAIN) {
+ epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
$self->{wbuf_off} = $off;
- watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
} else {
return $self->close;
1; # all done
}
-sub do_read ($$$$) {
+sub rbuf_idle ($$) {
+ my ($self, $rbuf) = @_;
+ if ($$rbuf eq '') { # who knows how long till we can read again
+ delete $self->{rbuf};
+ } else {
+ $self->{rbuf} = $rbuf;
+ }
+}
+
+sub do_read ($$$;$) {
my ($self, $rbuf, $len, $off) = @_;
- my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+ my $r = sysread(my $sock = $self->{sock}, $$rbuf, $len, $off // 0);
return ($r == 0 ? $self->close : $r) if defined $r;
# common for clients to break connections without warning,
# would be too noisy to log here:
- if (ref($self) eq 'IO::Socket::SSL') {
- my $ev = PublicInbox::TLS::epollbit() or return $self->close;
- watch($self, $ev | EPOLLONESHOT);
- } elsif ($! == EAGAIN) {
- watch($self, EPOLLIN | EPOLLONESHOT);
+ if ($! == EAGAIN) {
+ epwait($sock, epbit($sock, EPOLLIN) | EPOLLONESHOT);
+ rbuf_idle($self, $rbuf);
+ 0;
} else {
$self->close;
}
if (defined $written) {
return 1 if $written == $to_write;
+ requeue($self); # runs: event_step -> flush_write
} elsif ($! == EAGAIN) {
+ epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
$written = 0;
} else {
return $self->close;
}
+
+ # deal with EAGAIN or partial write:
my $tmpio = tmpio($self, $bref, $written) or return 0;
# wbuf may be an empty array if we're being called inside
# ->flush_write via CODE bref:
push @{$self->{wbuf} ||= []}, $tmpio;
- watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
# queue up the unwritten substring:
my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
$self->{wbuf} = [ $tmpio ];
- watch($self, EPOLLOUT|EPOLLONESHOT);
+ epwait($sock, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
$self->write(\($_[1]));
}
-sub watch ($$) {
- my ($self, $ev) = @_;
- my $sock = $self->{sock} or return;
+sub epwait ($$) {
+ my ($sock, $ev) = @_;
epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
confess("EPOLL_CTL_MOD $!");
- 0;
}
-sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
-
# return true if complete, false if incomplete (or failure)
sub accept_tls_step ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
return 1 if $sock->accept_SSL;
return $self->close if $! != EAGAIN;
- if (my $ev = PublicInbox::TLS::epollbit()) {
- unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
- return watch($self, $ev | EPOLLONESHOT);
- }
- drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+ epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+ unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+ 0;
}
+# return true if complete, false if incomplete (or failure)
sub shutdn_tls_step ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
return $self->close if $! != EAGAIN;
- if (my $ev = PublicInbox::TLS::epollbit()) {
- unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
- return watch($self, $ev | EPOLLONESHOT);
- }
- drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+ epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+ unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+ 0;
}
# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
$self->close;
}
}
-
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
sub cancel {
use parent qw(IO::KQueue);
use parent qw(Exporter);
use IO::KQueue;
-use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLLET
+ EPOLL_CTL_ADD EPOLL_CTL_MOD EPOLL_CTL_DEL);
our @EXPORT_OK = qw(epoll_ctl epoll_wait);
my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec)
sub kq_flag ($$) {
my ($bit, $ev) = @_;
if ($ev & $bit) {
- my $fl = EV_ADD | EV_ENABLE;
- ($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl;
+ my $fl = EV_ENABLE;
+ $fl |= EV_CLEAR if $fl & EPOLLET;
+
+ # EV_DISPATCH matches EPOLLONESHOT semantics more closely
+ # than EV_ONESHOT, in that EV_ADD is not required to
+ # re-enable a disabled watch.
+ ($ev & EPOLLONESHOT) ? ($fl | EV_DISPATCH) : $fl;
} else {
- EV_ADD | EV_DISABLE;
+ EV_DISABLE;
}
}
sub epoll_ctl {
my ($self, $op, $fd, $ev) = @_;
- if ($op != EPOLL_CTL_DEL) {
+ if ($op == EPOLL_CTL_MOD) {
$self->EV_SET($fd, EVFILT_READ, kq_flag(EPOLLIN, $ev));
$self->EV_SET($fd, EVFILT_WRITE, kq_flag(EPOLLOUT, $ev));
+ } elsif ($op == EPOLL_CTL_DEL) {
+ $self->EV_SET($fd, EVFILT_READ, EV_DISABLE);
+ $self->EV_SET($fd, EVFILT_WRITE, EV_DISABLE);
+ } else {
+ $self->EV_SET($fd, EVFILT_READ, EV_ADD|kq_flag(EPOLLIN, $ev));
+ $self->EV_SET($fd, EVFILT_WRITE, EV_ADD|kq_flag(EPOLLOUT, $ev));
}
0;
}
sub worker_quit {
- my ($reason) = @_;
# killing again terminates immediately:
exit unless @listeners;
$_->close foreach @listeners; # call PublicInbox::DS::close
@listeners = ();
- $reason->close if ref($reason) eq 'PublicInbox::ParentPipe';
my $proc_name;
my $warn = 0;
} else {
reopen_logs();
$set_user->() if $set_user;
- $SIG{USR2} = sub { worker_quit('USR2') if upgrade() };
+ $SIG{USR2} = sub { worker_quit() if upgrade() };
$refresh->();
}
$uid = $gid = undef;
-# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2016-2019 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# event cleanups (currently for PublicInbox::DS)
+# event cleanups (for PublicInbox::DS)
package PublicInbox::EvCleanup;
use strict;
use warnings;
-use base qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+require PublicInbox::DS;
+# this only runs under public-inbox-{httpd/nntpd}, not generic PSGI servers
my $ENABLED;
sub enabled { $ENABLED }
sub enable { $ENABLED = 1 }
-my $singleton;
-my $asapq = [ [], undef ];
-my $nextq = [ [], undef ];
my $laterq = [ [], undef ];
-sub once_init () {
- my $self = fields::new('PublicInbox::EvCleanup');
- my ($r, $w);
-
- # This is a dummy pipe which is always writable so it can always
- # fires in the next event loop iteration.
- pipe($r, $w) or die "pipe: $!";
- fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
- $self->SUPER::new($w, 0);
-
- # always writable, since PublicInbox::EvCleanup::event_step
- # never drains wbuf. We can avoid wasting a hash slot by
- # stuffing the read-end of the pipe into the never-to-be-touched
- # wbuf
- $self->{wbuf} = $r;
- $self;
-}
-
-sub _run_all ($) {
- my ($q) = @_;
-
- my $run = $q->[0];
- $q->[0] = [];
- $q->[1] = undef;
- $_->() foreach @$run;
-}
-
-# ensure PublicInbox::DS::ToClose processing after timers fire
-sub _asap_close () { $asapq->[1] ||= _asap_timer() }
-
-# Called by PublicInbox::DS
-sub event_step { _run_all($asapq) }
-
-sub _run_next () {
- _run_all($nextq);
- _asap_close();
-}
-
sub _run_later () {
- _run_all($laterq);
- _asap_close();
-}
-
-sub _asap_timer () {
- $singleton ||= once_init();
- $singleton->watch(EPOLLOUT|EPOLLONESHOT);
- 1;
-}
-
-sub asap ($) {
- my ($cb) = @_;
- push @{$asapq->[0]}, $cb;
- $asapq->[1] ||= _asap_timer();
-}
-
-sub next_tick ($) {
- my ($cb) = @_;
- push @{$nextq->[0]}, $cb;
- $nextq->[1] ||= PublicInbox::DS->AddTimer(0, *_run_next);
+ my $run = $laterq->[0];
+ $laterq->[0] = [];
+ $laterq->[1] = undef;
+ $_->() foreach @$run;
}
sub later ($) {
$laterq->[1] ||= PublicInbox::DS->AddTimer(60, *_run_later);
}
-END {
- event_step();
- _run_all($nextq);
- _run_all($laterq);
-}
-
+END { _run_later() }
1;
use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(httpd env rbuf input_left remote_addr remote_port forward pull);
+use fields qw(httpd env input_left remote_addr remote_port forward pull);
use bytes (); # only for bytes::length
use Fcntl qw(:seek);
use Plack::HTTPParser qw(parse_http_request); # XS or pure Perl
use Errno qw(EAGAIN);
my $pipelineq = [];
-my $pipet;
sub process_pipelineq () {
my $q = $pipelineq;
- $pipet = undef;
$pipelineq = [];
foreach (@$q) {
next unless $_->{sock};
sub new ($$$) {
my ($class, $sock, $addr, $httpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
+ my $ev = EPOLLIN;
+ my $wbuf;
+ if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+ return CORE::close($sock) if $! != EAGAIN;
+ $ev = PublicInbox::TLS::epollbit();
+ $wbuf = [ \&PublicInbox::DS::accept_tls_step ];
+ }
+ $self->SUPER::new($sock, $ev | EPOLLONESHOT);
$self->{httpd} = $httpd;
- $self->{rbuf} = '';
+ $self->{wbuf} = $wbuf if $wbuf;
($self->{remote_addr}, $self->{remote_port}) =
PublicInbox::Daemon::host_with_port($addr);
$self;
# otherwise we can be buffering infinitely w/o backpressure
return read_input($self) if defined $self->{env};
- my $rbuf = \($self->{rbuf});
- my $off = bytes::length($$rbuf);
- $self->do_read($rbuf, 8192, $off) and rbuf_process($self);
+ my $rbuf = $self->{rbuf} // (\(my $x = ''));
+ $self->do_read($rbuf, 8192, bytes::length($$rbuf)) or return;
+ rbuf_process($self, $rbuf);
}
sub rbuf_process {
- my ($self) = @_;
+ my ($self, $rbuf) = @_;
+ $rbuf //= $self->{rbuf} // (\(my $x = ''));
my %env = %{$self->{httpd}->{env}}; # full hash copy
- my $r = parse_http_request($self->{rbuf}, \%env);
+ my $r = parse_http_request($$rbuf, \%env);
# We do not support Trailers in chunked requests, for now
# (they are rarely-used and git (as of 2.7.2) does not use them)
if ($r == -1 || $env{HTTP_TRAILER} ||
# this length-check is necessary for PURE_PERL=1:
- ($r == -2 && bytes::length($self->{rbuf}) > 0x4000)) {
+ ($r == -2 && bytes::length($$rbuf) > 0x4000)) {
return quit($self, 400);
}
- return $self->watch_in1 if $r < 0; # incomplete
- $self->{rbuf} = substr($self->{rbuf}, $r);
-
+ if ($r < 0) { # incomplete
+ $self->rbuf_idle($rbuf);
+ return $self->requeue;
+ }
+ $$rbuf = substr($$rbuf, $r);
my $len = input_prepare($self, \%env);
defined $len or return write_err($self, undef); # EMFILE/ENFILE
- $len ? read_input($self) : app_dispatch($self);
+ $len ? read_input($self, $rbuf) : app_dispatch($self, undef, $rbuf);
}
# IO::Handle::write returns boolean, this returns bytes written:
$w;
}
-sub read_input ($) {
- my ($self) = @_;
+sub read_input ($;$) {
+ my ($self, $rbuf) = @_;
+ $rbuf //= $self->{rbuf} // (\(my $x = ''));
my $env = $self->{env};
return if $env->{REMOTE_ADDR}; # in app dispatch
- return read_input_chunked($self) if env_chunked($env);
+ return read_input_chunked($self, $rbuf) if env_chunked($env);
# env->{CONTENT_LENGTH} (identity)
- my $sock = $self->{sock};
my $len = delete $self->{input_left};
- my $rbuf = \($self->{rbuf});
my $input = $env->{'psgi.input'};
while ($len > 0) {
}
$$rbuf = '';
}
- my $r = sysread($sock, $$rbuf, 8192);
- return recv_err($self, $r, $len) unless $r;
+ $self->do_read($rbuf, 8192) or return recv_err($self, $len);
# continue looping if $r > 0;
}
- app_dispatch($self, $input);
+ app_dispatch($self, $input, $rbuf);
}
sub app_dispatch {
- my ($self, $input) = @_;
+ my ($self, $input, $rbuf) = @_;
+ $self->rbuf_idle($rbuf);
my $env = $self->{env};
$env->{REMOTE_ADDR} = $self->{remote_addr};
$env->{REMOTE_PORT} = $self->{remote_port};
sub next_request ($) {
my ($self) = @_;
- if ($self->{rbuf} eq '') { # wait for next request
- $self->watch_in1;
- } else { # avoid recursion for pipelined requests
+ if ($self->{rbuf}) {
+ # avoid recursion for pipelined requests
+ PublicInbox::DS::requeue(\&process_pipelineq) if !@$pipelineq;
push @$pipelineq, $self;
- $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
+ } else { # wait for next request
+ $self->requeue;
}
}
if ($self->{wbuf}) {
$self->write($next);
} else {
- PublicInbox::EvCleanup::asap($next);
+ PublicInbox::DS::requeue($next);
}
return;
}
}
sub recv_err {
- my ($self, $r, $len) = @_;
- return $self->close if (defined $r && $r == 0);
- if ($! == EAGAIN) {
+ my ($self, $len) = @_;
+ if ($! == EAGAIN) { # epoll/kevent watch already set by do_read
$self->{input_left} = $len;
- return $self->watch_in1;
+ } else {
+ err($self, "error reading input: $! ($len bytes remaining)");
}
- err($self, "error reading for input: $! ($len bytes remaining)");
- quit($self, 500);
}
sub read_input_chunked { # unlikely...
- my ($self) = @_;
+ my ($self, $rbuf) = @_;
+ $rbuf //= $self->{rbuf} // (\(my $x = ''));
my $input = $self->{env}->{'psgi.input'};
- my $sock = $self->{sock};
my $len = delete $self->{input_left};
- my $rbuf = \($self->{rbuf});
while (1) { # chunk start
if ($len == CHUNK_ZEND) {
$$rbuf =~ s/\A\r\n//s and
- return app_dispatch($self, $input);
+ return app_dispatch($self, $input, $rbuf);
+
return quit($self, 400) if bytes::length($$rbuf) > 2;
}
if ($len == CHUNK_END) {
}
if ($len < 0) { # chunk header is trickled, read more
- my $off = bytes::length($$rbuf);
- my $r = sysread($sock, $$rbuf, 8192, $off);
- return recv_err($self, $r, $len) unless $r;
+ $self->do_read($rbuf, 8192, bytes::length($$rbuf)) or
+ return recv_err($self, $len);
# (implicit) goto chunk_start if $r > 0;
}
$len = CHUNK_ZEND if $len == 0;
}
if ($$rbuf eq '') {
# read more of current chunk
- my $r = sysread($sock, $$rbuf, 8192);
- return recv_err($self, $r, $len) unless $r;
+ $self->do_read($rbuf, 8192) or
+ return recv_err($self, $len);
}
}
}
# for graceful shutdown in PublicInbox::Daemon:
sub busy () {
my ($self) = @_;
- ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
+ ($self->{rbuf} || $self->{env} || $self->{wbuf});
}
-# fires after pending writes are complete:
-sub restart_pass ($) {
- $_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async
-}
-
-sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) }
-
1;
# XXX This is a totally unstable API for public-inbox internal use only
# This is exposed via the 'pi-httpd.async' key in the PSGI env hash.
# The name of this key is not even stable!
-# Currently is is intended for use with read-only pipes.
+# Currently intended for use with read-only pipes with expensive
+# processes such as git-http-backend(1), cgit(1)
package PublicInbox::HTTPD::Async;
use strict;
use warnings;
use base qw(PublicInbox::DS);
use fields qw(cb cleanup);
-require PublicInbox::EvCleanup;
use Errno qw(EAGAIN);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
sub new {
my ($class, $io, $cb, $cleanup) = @_;
# no $io? call $cb at the top of the next event loop to
# avoid recursion:
unless (defined($io)) {
- PublicInbox::EvCleanup::asap($cb) if $cb;
- PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+ PublicInbox::DS::requeue($cb);
+ die 'cleanup unsupported w/o $io' if $cleanup;
return;
}
my $self = fields::new($class);
IO::Handle::blocking($io, 0);
- $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
+ $self->SUPER::new($io, EPOLLIN | EPOLLET);
$self->{cb} = $cb;
$self->{cleanup} = $cleanup;
$self;
}
-sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
-
-sub main_cb ($$$) {
- my ($http, $fh, $bref) = @_;
+sub main_cb ($$) {
+ my ($http, $fh) = @_;
sub {
my ($self) = @_;
- my $r = sysread($self->{sock}, $$bref, 8192);
+ my $r = sysread($self->{sock}, my $buf, 65536);
if ($r) {
- $fh->write($$bref); # may call $http->close
-
+ $fh->write($buf); # may call $http->close
if ($http->{sock}) { # !closed
- if ($http->{wbuf}) {
- # HTTP client could not keep up, so
- # stop reading and buffering.
- $self->watch(0);
-
- # Tell the HTTP socket to restart us
- # when HTTP client is done draining
- # $http->{wbuf}:
- $http->enqueue_restart_pass;
- }
- # stay in EPOLLIN, but let other clients
- # get some work done, too.
+ $self->requeue;
+ # let other clients get some work done, too
return;
}
- # fall through to close below...
- } elsif (!defined $r) {
- return restart_read($self) if $! == EAGAIN;
+
+ # else: fall through to close below...
+ } elsif (!defined $r && $! == EAGAIN) {
+ return; # EPOLLET means we'll be notified
}
# Done! Error handling will happen in $fh->close
# will automatically close this ($self) object.
$http->{forward} = $self;
$fh->write($$bref); # PublicInbox:HTTP::{chunked,identity}_wcb
- $self->{cb} = main_cb($http, $fh, $bref);
+ $$bref = undef; # we're done with this
+ my $cb = $self->{cb} = main_cb($http, $fh);
+ $cb->($self); # either hit EAGAIN or ->requeue to keep EPOLLET happy
}
-sub event_step { $_[0]->{cb}->(@_) }
+sub event_step {
+ # {cb} may be undef after ->requeue due to $http->close happening
+ my $cb = $_[0]->{cb} or return;
+ $cb->(@_);
+}
sub close {
my $self = $_[0];
# we defer this to the next timer loop since close is deferred
if (my $cleanup = delete $self->{cleanup}) {
- PublicInbox::EvCleanup::next_tick($cleanup);
+ PublicInbox::DS::requeue($cleanup);
}
}
use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
use fields qw(post_accept);
require IO::Handle;
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET);
sub new ($$$) {
my ($class, $s, $cb) = @_;
listen($s, 1024);
IO::Handle::blocking($s, 0);
my $self = fields::new($class);
- $self->SUPER::new($s, PublicInbox::DS::EPOLLIN()|
- PublicInbox::DS::EPOLLEXCLUSIVE());
+ $self->SUPER::new($s, EPOLLIN|EPOLLET|EPOLLEXCLUSIVE);
$self->{post_accept} = $cb;
$self
}
sub event_step {
my ($self) = @_;
- my $sock = $self->{sock};
+ my $sock = $self->{sock} or return;
# no loop here, we want to fairly distribute clients
# between multiple processes sharing the same socket
if (my $addr = accept(my $c, $sock)) {
IO::Handle::blocking($c, 0); # no accept4 :<
$self->{post_accept}->($c, $addr, $sock);
+ $self->requeue;
}
}
use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng);
+use fields qw(nntpd article ng);
use PublicInbox::Search;
use PublicInbox::Msgmap;
use PublicInbox::MID qw(mid_escape);
my $EXPMAP; # fd -> [ idle_time, $self ]
my $expt;
our $EXPTIME = 180; # 3 minutes
-my $nextt;
-
-my $nextq = [];
-sub next_tick () {
- $nextt = undef;
- my $q = $nextq;
- $nextq = [];
- event_step($_) for @$q;
-}
-
-sub requeue ($) {
- push @$nextq, $_[0];
- $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
-}
sub update_idle_time ($) {
my ($self) = @_;
my $exp = $EXPTIME;
my $old = $now - $exp;
my $nr = 0;
- my $closed = 0;
my %new;
while (my ($fd, $v) = each %$EXPMAP) {
my ($idle_time, $nntp) = @$v;
if ($idle_time < $old) {
- if ($nntp->shutdn) {
- $closed++;
- } else {
+ if (!$nntp->shutdn) {
++$nr;
$new{$fd} = $v;
}
}
}
$EXPMAP = \%new;
- if ($nr) {
- $expt = PublicInbox::EvCleanup::later(*expire_old);
- } else {
- $expt = undef;
- # noop to kick outselves out of the loop ASAP so descriptors
- # really get closed
- PublicInbox::EvCleanup::asap(sub {}) if $closed;
- }
+ $expt = PublicInbox::EvCleanup::later(*expire_old) if $nr;
}
sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
my $ev = EPOLLIN;
my $wbuf;
if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
- $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
+ return CORE::close($sock) if $! != EAGAIN;
+ $ev = PublicInbox::TLS::epollbit();
$wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
}
$self->SUPER::new($sock, $ev | EPOLLONESHOT);
push @$wbuf, $long_cb;
# wbuf may be populated by $cb, no need to rearm if so:
- requeue($self) if scalar(@$wbuf) == 1;
+ $self->requeue if scalar(@$wbuf) == 1;
} else { # all done!
$long_cb = undef;
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
- requeue($self) unless $self->{wbuf};
+ $self->requeue unless $self->{wbuf};
}
};
$self->write($long_cb); # kick off!
return '580 can not initiate TLS negotiation';
res($self, '382 Continue with TLS negotiation');
$self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
- requeue($self) if PublicInbox::DS::accept_tls_step($self);
+ $self->requeue if PublicInbox::DS::accept_tls_step($self);
undef;
}
return $self->close if $r < 0;
my $len = bytes::length($$rbuf);
return $self->close if ($len >= LINE_MAX);
- if ($len) {
- $self->{rbuf} = $rbuf;
- } else {
- delete $self->{rbuf};
- }
+ $self->rbuf_idle($rbuf);
update_idle_time($self);
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
- requeue($self) unless $self->{wbuf};
+ $self->requeue unless $self->{wbuf};
}
sub not_idle_long ($$) {
# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# only for PublicInbox::Daemon
+
+# only for PublicInbox::Daemon, allows worker processes to be
+# notified if the master process dies.
package PublicInbox::ParentPipe;
use strict;
use warnings;
use base qw(PublicInbox::DS);
use fields qw(cb);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
sub new ($$$) {
- my ($class, $pipe, $cb) = @_;
+ my ($class, $pipe, $worker_quit) = @_;
my $self = fields::new($class);
- $self->SUPER::new($pipe, PublicInbox::DS::EPOLLIN());
- $self->{cb} = $cb;
+ $self->SUPER::new($pipe, EPOLLIN|EPOLLONESHOT);
+ $self->{cb} = $worker_quit;
$self;
}
-sub event_step { $_[0]->{cb}->($_[0]) }
+# master process died, time to call worker_quit ourselves
+sub event_step {
+ $_[0]->close; # PublicInbox::DS::close
+ $_[0]->{cb}->();
+}
1;
my $rpipe; # comes from popen_rd
my $async = $env->{'pi-httpd.async'};
my $cb = sub {
- my $r = sysread($rpipe, my $buf, 8192);
+ my $r = sysread($rpipe, my $buf, 65536);
if ($async) {
$async->async_pass($env->{'psgix.io'}, $qx, \$buf);
} elsif (defined $r) {
$VERSION = "0.25";
@ISA = qw(Exporter);
@EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait
- EPOLLIN EPOLLOUT
+ EPOLLIN EPOLLOUT EPOLLET
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
EPOLLONESHOT EPOLLEXCLUSIVE);
%EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
# use constant EPOLLRDBAND => 128;
use constant EPOLLEXCLUSIVE => (1 << 28);
use constant EPOLLONESHOT => (1 << 30);
-# use constant EPOLLET => (1 << 31);
+use constant EPOLLET => (1 << 31);
use constant EPOLL_CTL_ADD => 1;
use constant EPOLL_CTL_DEL => 2;
use constant EPOLL_CTL_MOD => 3;
# returns the EPOLL event bit which matches the existing SSL error
sub epollbit () {
- if ($! == EAGAIN) {
- return EPOLLIN if $SSL_ERROR == SSL_WANT_READ;
- return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE;
- die "unexpected SSL error: $SSL_ERROR";
- }
- 0;
+ return EPOLLIN if $SSL_ERROR == SSL_WANT_READ;
+ return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE;
+ die "unexpected SSL error: $SSL_ERROR";
}
1;
if (!ref($inboxes) && $inboxes eq 'watchspam') {
return _remove_spam($self, $path);
}
+
+ my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
+ local $SIG{__WARN__} = sub {
+ $warn_cb->("path: $path\n");
+ $warn_cb->(@_);
+ };
foreach my $ibx (@$inboxes) {
my $mime = _path_to_mime($path) or next;
my $im = _importer_for($self, $ibx);
PublicInbox::DS->SetLoopTimeout(0);
PublicInbox::DS->SetPostLoopCallback(sub { 0 });
+
+ # make sure execve closes if we're using fork()
+ my ($r, $w);
+ pipe($r, $w) or die "pipe: $!";
+
PublicInbox::DS->AddTimer(0, sub { $pid = spawn([qw(sleep 10)]) });
PublicInbox::DS->EventLoop;
ok($pid, 'subprocess spawned');
+
+ # wait for execve, we need to ensure lsof sees sleep(1)
+ # and not the fork of this process:
+ close $w or die "close: $!";
+ my $l = <$r>;
+ is($l, undef, 'cloexec works and sleep(1) is running');
+
my @of = grep(/$evfd_re/, `lsof -p $pid 2>/dev/null`);
my $err = $?;
SKIP: {
--- /dev/null
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw(tempdir);
+use Socket qw(SOCK_STREAM IPPROTO_TCP SOL_SOCKET);
+# IO::Poll is part of the standard library, but distros may split them off...
+foreach my $mod (qw(IO::Socket::SSL IO::Poll)) {
+ eval "require $mod";
+ plan skip_all => "$mod missing for $0" if $@;
+}
+my $cert = 'certs/server-cert.pem';
+my $key = 'certs/server-key.pem';
+unless (-r $key && -r $cert) {
+ plan skip_all =>
+ "certs/ missing for $0, run ./create-certs.perl in certs/";
+}
+use_ok 'PublicInbox::TLS';
+use_ok 'IO::Socket::SSL';
+require './t/common.perl';
+my $psgi = "./t/httpd-corner.psgi";
+my $tmpdir = tempdir('pi-httpd-https-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $err = "$tmpdir/stderr.log";
+my $out = "$tmpdir/stdout.log";
+my $httpd = 'blib/script/public-inbox-httpd';
+my %opts = (
+ LocalAddr => '127.0.0.1',
+ ReuseAddr => 1,
+ Proto => 'tcp',
+ Type => SOCK_STREAM,
+ Listen => 1024,
+);
+my $https = IO::Socket::INET->new(%opts);
+my ($pid, $tail_pid);
+END {
+ foreach ($pid, $tail_pid) {
+ kill 'TERM', $_ if defined $_;
+ }
+};
+my $https_addr = $https->sockhost . ':' . $https->sockport;
+my %opt = ( Proto => 'tcp', PeerAddr => $https_addr, Type => SOCK_STREAM );
+
+for my $args (
+ [ "-lhttps://$https_addr/?key=$key,cert=$cert" ],
+) {
+ for ($out, $err) {
+ open my $fh, '>', $_ or die "truncate: $!";
+ }
+ if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail
+ $tail_pid = fork;
+ if (defined $tail_pid && $tail_pid == 0) {
+ exec(split(' ', $tail_cmd), $out, $err);
+ }
+ }
+ my $cmd = [ $httpd, '-W0', @$args,
+ "--stdout=$out", "--stderr=$err", $psgi ];
+ $pid = spawn_listener(undef, $cmd, [ $https ]);
+ my %o = (
+ SSL_hostname => 'server.local',
+ SSL_verifycn_name => 'server.local',
+ SSL_verify_mode => SSL_VERIFY_PEER(),
+ SSL_ca_file => 'certs/test-ca.pem',
+ );
+ # start negotiating a slow TLS connection
+ my $slow = IO::Socket::INET->new(%opt, Blocking => 0);
+ $slow = IO::Socket::SSL->start_SSL($slow, SSL_startHandshake => 0, %o);
+ my @poll = (fileno($slow));
+ my $slow_done = $slow->connect_SSL;
+ if ($slow_done) {
+ diag('W: connect_SSL early OK, slow client test invalid');
+ push @poll, PublicInbox::Syscall::EPOLLOUT();
+ } else {
+ push @poll, PublicInbox::TLS::epollbit();
+ }
+
+ # normal HTTPS
+ my $c = IO::Socket::INET->new(%opt);
+ IO::Socket::SSL->start_SSL($c, %o);
+ ok($c->print("GET /empty HTTP/1.1\r\n\r\nHost: example.com\r\n\r\n"),
+ 'wrote HTTP request');
+ my $buf = '';
+ sysread($c, $buf, 2007, length($buf)) until $buf =~ /\r\n\r\n/;
+ like($buf, qr!\AHTTP/1\.1 200!, 'read HTTP response');
+
+ # HTTPS with bad hostname
+ $c = IO::Socket::INET->new(%opt);
+ $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.fail';
+ $c = IO::Socket::SSL->start_SSL($c, %o);
+ is($c, undef, 'HTTPS fails with bad hostname');
+
+ $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.local';
+ $c = IO::Socket::INET->new(%opt);
+ IO::Socket::SSL->start_SSL($c, %o);
+ ok($c, 'HTTPS succeeds again with valid hostname');
+
+ # slow TLS connection did not block the other fast clients while
+ # connecting, finish it off:
+ until ($slow_done) {
+ IO::Poll::_poll(-1, @poll);
+ $slow_done = $slow->connect_SSL and last;
+ @poll = (fileno($slow), PublicInbox::TLS::epollbit());
+ }
+ $slow->blocking(1);
+ ok($slow->print("GET /empty HTTP/1.1\r\n\r\nHost: example.com\r\n\r\n"),
+ 'wrote HTTP request from slow');
+ $buf = '';
+ sysread($slow, $buf, 666, length($buf)) until $buf =~ /\r\n\r\n/;
+ like($buf, qr!\AHTTP/1\.1 200!, 'read HTTP response from slow');
+ $slow = undef;
+
+ SKIP: {
+ skip 'TCP_DEFER_ACCEPT is Linux-only', 2 if $^O ne 'linux';
+ my $var = Socket::TCP_DEFER_ACCEPT();
+ defined(my $x = getsockopt($https, IPPROTO_TCP, $var)) or die;
+ ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set on https');
+ };
+ SKIP: {
+ skip 'SO_ACCEPTFILTER is FreeBSD-only', 2 if $^O ne 'freebsd';
+ if (system('kldstat -m accf_data >/dev/null')) {
+ skip 'accf_data not loaded? kldload accf_data', 2;
+ }
+ require PublicInbox::Daemon;
+ my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+ my $x = getsockopt($https, SOL_SOCKET, $var);
+ like($x, qr/\Adataready\0+\z/, 'got dataready accf for https');
+ };
+
+ $c = undef;
+ kill('TERM', $pid);
+ is($pid, waitpid($pid, 0), 'httpd exited successfully');
+ is($?, 0, 'no error in exited process');
+ $pid = undef;
+ if (defined $tail_pid) {
+ kill 'TERM', $tail_pid;
+ waitpid($tail_pid, 0);
+ $tail_pid = undef;
+ }
+}
+done_testing();
+1;
eval "require $mod";
plan skip_all => "$mod missing for $0" if $@;
}
+Net::NNTP->can('starttls') or
+ plan skip_all => 'Net::NNTP does not support TLS';
+
my $cert = 'certs/server-cert.pem';
my $key = 'certs/server-key.pem';
unless (-r $key && -r $cert) {
is_deeply($list, { $group => [ qw(1 1 n) ] }, 'LIST works');
is_deeply([$n->group($group)], [ qw(0 1 1), $group ], 'GROUP works');
is_deeply($n->listgroup($group), [1], 'listgroup OK');
- ok(!$n->starttls, 'STARTTLS fails when unconfigured');
- is($n->code, 580, 'got 580 code on server w/o TLS');
+
+ SKIP: {
+ $n->can('starttls') or
+ skip('Net::NNTP too old to support STARTTLS', 2);
+ ok(!$n->starttls, 'STARTTLS fails when unconfigured');
+ is($n->code, 580, 'got 580 code on server w/o TLS');
+ };
%opts = (
PeerAddr => $host_port,