From: Eric Wong Date: Sun, 30 Jun 2019 17:13:30 +0000 (+0000) Subject: Merge remote-tracking branch 'origin/email-simple-mem' into master X-Git-Tag: v1.2.0~135 X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=commitdiff_plain;h=ecea327e3d4386a22652fc08f71ac7d65b8f9b70;hp=ea71a5606c633f82975e8208a6c552053f7f5af8 Merge remote-tracking branch 'origin/email-simple-mem' into master * origin/email-simple-mem: nntp: reduce syscalls for ARTICLE and BODY mbox: split header and body processing mbox: use Email::Simple->new to do in-place modifications nntp: rework and simplify art_lookup response --- diff --git a/MANIFEST b/MANIFEST index 29920953..4cb5f38f 100644 --- a/MANIFEST +++ b/MANIFEST @@ -210,6 +210,7 @@ t/hl_mod.t t/html_index.t t/httpd-corner.psgi t/httpd-corner.t +t/httpd-https.t t/httpd-unix.t t/httpd.t t/hval.t diff --git a/examples/public-inbox-httpd@.service b/examples/public-inbox-httpd@.service index 56117ef0..e811da40 100644 --- a/examples/public-inbox-httpd@.service +++ b/examples/public-inbox-httpd@.service @@ -20,7 +20,12 @@ ExecStartPre = /bin/mkdir -p -m 1777 /tmp/.pub-inline ExecStart = /usr/local/bin/public-inbox-httpd \ -1 /var/log/public-inbox/httpd.out.log StandardError = syslog + +# NonBlocking is REQUIRED to avoid a race condition if running +# simultaneous services +NonBlocking = true Sockets = public-inbox-httpd.socket + KillSignal = SIGQUIT User = nobody Group = nogroup diff --git a/examples/public-inbox-nntpd@.service b/examples/public-inbox-nntpd@.service index 62202c2f..a879841e 100644 --- a/examples/public-inbox-nntpd@.service +++ b/examples/public-inbox-nntpd@.service @@ -20,7 +20,12 @@ ExecStartPre = /bin/mkdir -p -m 1777 /tmp/.pub-inline ExecStart = /usr/local/bin/public-inbox-nntpd \ -1 /var/log/public-inbox/nntpd.out.log StandardError = syslog + +# NonBlocking is REQUIRED to avoid a race condition if running +# simultaneous services +NonBlocking = true Sockets = public-inbox-nntpd.socket + KillSignal = SIGQUIT User = nobody Group = nogroup diff --git a/examples/unsubscribe-psgi@.service b/examples/unsubscribe-psgi@.service index acc29e8e..c8721fbe 100644 --- a/examples/unsubscribe-psgi@.service +++ b/examples/unsubscribe-psgi@.service @@ -12,7 +12,12 @@ After = unsubscribe-psgi.socket # any PSGI server ought to work, # but public-inbox-httpd supports socket activation like unsubscribe.milter ExecStart = /usr/local/bin/public-inbox-httpd -W0 /etc/unsubscribe.psgi + +# NonBlocking is REQUIRED to avoid a race condition if running +# simultaneous services +NonBlocking = true Sockets = unsubscribe-psgi.socket + # we need to modify the mlmmj spool User = mlmmj KillMode = process diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index a8700bc5..586c47cd 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -28,6 +28,7 @@ use 5.010_001; use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket + 'rbuf', # scalarref, usually undef 'wbuf', # arrayref of coderefs or GLOB refs 'wbuf_off', # offset into first element of wbuf to start writing at ); @@ -36,6 +37,7 @@ use Errno qw(EAGAIN EINVAL EEXIST); use Carp qw(croak confess carp); require File::Spec; +my $nextq = []; # queue for next_tick our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (or DSKQXS ref) @@ -98,12 +100,6 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t sub AddTimer { my ($class, $secs, $coderef) = @_; - if (!$secs) { - my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer'); - unshift(@Timers, $timer); - return $timer; - } - my $fire_time = now() + $secs; my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer"; @@ -173,9 +169,23 @@ sub FirstTimeEventLoop { sub now () { clock_gettime(CLOCK_MONOTONIC) } +sub next_tick () { + my $q = $nextq; + $nextq = []; + for (@$q) { + if (ref($_) eq 'CODE') { + $_->(); + } else { + $_->event_step; + } + } +} + # runs timers and returns milliseconds for next one, or next event loop sub RunTimers { - return $LoopTimeout unless @Timers; + next_tick(); + + return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers; my $now = now(); @@ -185,6 +195,9 @@ sub RunTimers { $to_run->[1]->($now) if $to_run->[1]; } + # timers may enqueue into nextq: + return 0 if (@$nextq || @ToClose); + return $LoopTimeout unless @Timers; # convert time to an even number of milliseconds, adding 1 @@ -246,17 +259,8 @@ sub PostEventLoop { # now we can close sockets that wanted to close during our event processing. # (we didn't want to close them during the loop, as we didn't want fd numbers # being reused and confused during the event loop) - while (my $sock = shift @ToClose) { - my $fd = fileno($sock); - - # close the socket. (not a PublicInbox::DS close) - CORE::close($sock); - - # and now we can finally remove the fd from the map. see - # comment above in ->close. - delete $DescriptorMap{$fd}; - } - + delete($DescriptorMap{fileno($_)}) for @ToClose; + @ToClose = (); # let refcounting drop everything all at once # by default we keep running, unless a postloop callback (either per-object # or global) cancels it @@ -317,6 +321,8 @@ sub new { ### I N S T A N C E M E T H O D S ##################################################################### +sub requeue ($) { push @$nextq, $_[0] } + =head2 C<< $obj->close >> Close the socket. @@ -373,6 +379,10 @@ sub psendfile ($$$) { $written; } +sub epbit ($$) { # (sock, default) + ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1]; +} + # returns 1 if done, 0 if incomplete sub flush_write ($) { my ($self) = @_; @@ -391,8 +401,8 @@ next_buf: goto next_buf; } } elsif ($! == EAGAIN) { + epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT); $self->{wbuf_off} = $off; - watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } else { return $self->close; @@ -412,17 +422,25 @@ next_buf: 1; # all done } -sub do_read ($$$$) { +sub rbuf_idle ($$) { + my ($self, $rbuf) = @_; + if ($$rbuf eq '') { # who knows how long till we can read again + delete $self->{rbuf}; + } else { + $self->{rbuf} = $rbuf; + } +} + +sub do_read ($$$;$) { my ($self, $rbuf, $len, $off) = @_; - my $r = sysread($self->{sock}, $$rbuf, $len, $off); + my $r = sysread(my $sock = $self->{sock}, $$rbuf, $len, $off // 0); return ($r == 0 ? $self->close : $r) if defined $r; # common for clients to break connections without warning, # would be too noisy to log here: - if (ref($self) eq 'IO::Socket::SSL') { - my $ev = PublicInbox::TLS::epollbit() or return $self->close; - watch($self, $ev | EPOLLONESHOT); - } elsif ($! == EAGAIN) { - watch($self, EPOLLIN | EPOLLONESHOT); + if ($! == EAGAIN) { + epwait($sock, epbit($sock, EPOLLIN) | EPOLLONESHOT); + rbuf_idle($self, $rbuf); + 0; } else { $self->close; } @@ -499,17 +517,20 @@ sub write { if (defined $written) { return 1 if $written == $to_write; + requeue($self); # runs: event_step -> flush_write } elsif ($! == EAGAIN) { + epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT); $written = 0; } else { return $self->close; } + + # deal with EAGAIN or partial write: my $tmpio = tmpio($self, $bref, $written) or return 0; # wbuf may be an empty array if we're being called inside # ->flush_write via CODE bref: push @{$self->{wbuf} ||= []}, $tmpio; - watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } } @@ -528,46 +549,39 @@ sub msg_more ($$) { # queue up the unwritten substring: my $tmpio = tmpio($self, \($_[1]), $n) or return 0; $self->{wbuf} = [ $tmpio ]; - watch($self, EPOLLOUT|EPOLLONESHOT); + epwait($sock, EPOLLOUT|EPOLLONESHOT); return 0; } } $self->write(\($_[1])); } -sub watch ($$) { - my ($self, $ev) = @_; - my $sock = $self->{sock} or return; +sub epwait ($$) { + my ($sock, $ev) = @_; epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and confess("EPOLL_CTL_MOD $!"); - 0; } -sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) } - # return true if complete, false if incomplete (or failure) sub accept_tls_step ($) { my ($self) = @_; my $sock = $self->{sock} or return; return 1 if $sock->accept_SSL; return $self->close if $! != EAGAIN; - if (my $ev = PublicInbox::TLS::epollbit()) { - unshift @{$self->{wbuf} ||= []}, \&accept_tls_step; - return watch($self, $ev | EPOLLONESHOT); - } - drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err()); + epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT); + unshift @{$self->{wbuf} ||= []}, \&accept_tls_step; + 0; } +# return true if complete, false if incomplete (or failure) sub shutdn_tls_step ($) { my ($self) = @_; my $sock = $self->{sock} or return; return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1); return $self->close if $! != EAGAIN; - if (my $ev = PublicInbox::TLS::epollbit()) { - unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step; - return watch($self, $ev | EPOLLONESHOT); - } - drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err()); + epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT); + unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step; + 0; } # don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC @@ -581,7 +595,6 @@ sub shutdn ($) { $self->close; } } - package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; sub cancel { diff --git a/lib/PublicInbox/DSKQXS.pm b/lib/PublicInbox/DSKQXS.pm index 364df3d6..1c3b970b 100644 --- a/lib/PublicInbox/DSKQXS.pm +++ b/lib/PublicInbox/DSKQXS.pm @@ -16,7 +16,8 @@ use warnings; use parent qw(IO::KQueue); use parent qw(Exporter); use IO::KQueue; -use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL); +use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLLET + EPOLL_CTL_ADD EPOLL_CTL_MOD EPOLL_CTL_DEL); our @EXPORT_OK = qw(epoll_ctl epoll_wait); my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec) @@ -24,10 +25,15 @@ my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec) sub kq_flag ($$) { my ($bit, $ev) = @_; if ($ev & $bit) { - my $fl = EV_ADD | EV_ENABLE; - ($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl; + my $fl = EV_ENABLE; + $fl |= EV_CLEAR if $fl & EPOLLET; + + # EV_DISPATCH matches EPOLLONESHOT semantics more closely + # than EV_ONESHOT, in that EV_ADD is not required to + # re-enable a disabled watch. + ($ev & EPOLLONESHOT) ? ($fl | EV_DISPATCH) : $fl; } else { - EV_ADD | EV_DISABLE; + EV_DISABLE; } } @@ -40,9 +46,15 @@ sub new { sub epoll_ctl { my ($self, $op, $fd, $ev) = @_; - if ($op != EPOLL_CTL_DEL) { + if ($op == EPOLL_CTL_MOD) { $self->EV_SET($fd, EVFILT_READ, kq_flag(EPOLLIN, $ev)); $self->EV_SET($fd, EVFILT_WRITE, kq_flag(EPOLLOUT, $ev)); + } elsif ($op == EPOLL_CTL_DEL) { + $self->EV_SET($fd, EVFILT_READ, EV_DISABLE); + $self->EV_SET($fd, EVFILT_WRITE, EV_DISABLE); + } else { + $self->EV_SET($fd, EVFILT_READ, EV_ADD|kq_flag(EPOLLIN, $ev)); + $self->EV_SET($fd, EVFILT_WRITE, EV_ADD|kq_flag(EPOLLOUT, $ev)); } 0; } diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index cf011a20..2b7ac266 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -252,13 +252,11 @@ sub daemonize () { sub worker_quit { - my ($reason) = @_; # killing again terminates immediately: exit unless @listeners; $_->close foreach @listeners; # call PublicInbox::DS::close @listeners = (); - $reason->close if ref($reason) eq 'PublicInbox::ParentPipe'; my $proc_name; my $warn = 0; @@ -590,7 +588,7 @@ sub daemon_loop ($$$$) { } else { reopen_logs(); $set_user->() if $set_user; - $SIG{USR2} = sub { worker_quit('USR2') if upgrade() }; + $SIG{USR2} = sub { worker_quit() if upgrade() }; $refresh->(); } $uid = $gid = undef; diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index 33b54ebc..be6672ed 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -1,80 +1,23 @@ -# Copyright (C) 2016-2018 all contributors +# Copyright (C) 2016-2019 all contributors # License: AGPL-3.0+ -# event cleanups (currently for PublicInbox::DS) +# event cleanups (for PublicInbox::DS) package PublicInbox::EvCleanup; use strict; use warnings; -use base qw(PublicInbox::DS); -use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); +require PublicInbox::DS; +# this only runs under public-inbox-{httpd/nntpd}, not generic PSGI servers my $ENABLED; sub enabled { $ENABLED } sub enable { $ENABLED = 1 } -my $singleton; -my $asapq = [ [], undef ]; -my $nextq = [ [], undef ]; my $laterq = [ [], undef ]; -sub once_init () { - my $self = fields::new('PublicInbox::EvCleanup'); - my ($r, $w); - - # This is a dummy pipe which is always writable so it can always - # fires in the next event loop iteration. - pipe($r, $w) or die "pipe: $!"; - fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ - $self->SUPER::new($w, 0); - - # always writable, since PublicInbox::EvCleanup::event_step - # never drains wbuf. We can avoid wasting a hash slot by - # stuffing the read-end of the pipe into the never-to-be-touched - # wbuf - $self->{wbuf} = $r; - $self; -} - -sub _run_all ($) { - my ($q) = @_; - - my $run = $q->[0]; - $q->[0] = []; - $q->[1] = undef; - $_->() foreach @$run; -} - -# ensure PublicInbox::DS::ToClose processing after timers fire -sub _asap_close () { $asapq->[1] ||= _asap_timer() } - -# Called by PublicInbox::DS -sub event_step { _run_all($asapq) } - -sub _run_next () { - _run_all($nextq); - _asap_close(); -} - sub _run_later () { - _run_all($laterq); - _asap_close(); -} - -sub _asap_timer () { - $singleton ||= once_init(); - $singleton->watch(EPOLLOUT|EPOLLONESHOT); - 1; -} - -sub asap ($) { - my ($cb) = @_; - push @{$asapq->[0]}, $cb; - $asapq->[1] ||= _asap_timer(); -} - -sub next_tick ($) { - my ($cb) = @_; - push @{$nextq->[0]}, $cb; - $nextq->[1] ||= PublicInbox::DS->AddTimer(0, *_run_next); + my $run = $laterq->[0]; + $laterq->[0] = []; + $laterq->[1] = undef; + $_->() foreach @$run; } sub later ($) { @@ -83,10 +26,5 @@ sub later ($) { $laterq->[1] ||= PublicInbox::DS->AddTimer(60, *_run_later); } -END { - event_step(); - _run_all($nextq); - _run_all($laterq); -} - +END { _run_later() } 1; diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index a1cb4aca..5546ac46 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -11,7 +11,7 @@ package PublicInbox::HTTP; use strict; use warnings; use base qw(PublicInbox::DS); -use fields qw(httpd env rbuf input_left remote_addr remote_port forward pull); +use fields qw(httpd env input_left remote_addr remote_port forward pull); use bytes (); # only for bytes::length use Fcntl qw(:seek); use Plack::HTTPParser qw(parse_http_request); # XS or pure Perl @@ -30,10 +30,8 @@ use constant { use Errno qw(EAGAIN); my $pipelineq = []; -my $pipet; sub process_pipelineq () { my $q = $pipelineq; - $pipet = undef; $pipelineq = []; foreach (@$q) { next unless $_->{sock}; @@ -58,9 +56,16 @@ sub http_date () { sub new ($$$) { my ($class, $sock, $addr, $httpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT); + my $ev = EPOLLIN; + my $wbuf; + if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) { + return CORE::close($sock) if $! != EAGAIN; + $ev = PublicInbox::TLS::epollbit(); + $wbuf = [ \&PublicInbox::DS::accept_tls_step ]; + } + $self->SUPER::new($sock, $ev | EPOLLONESHOT); $self->{httpd} = $httpd; - $self->{rbuf} = ''; + $self->{wbuf} = $wbuf if $wbuf; ($self->{remote_addr}, $self->{remote_port}) = PublicInbox::Daemon::host_with_port($addr); $self; @@ -75,31 +80,34 @@ sub event_step { # called by PublicInbox::DS # otherwise we can be buffering infinitely w/o backpressure return read_input($self) if defined $self->{env}; - my $rbuf = \($self->{rbuf}); - my $off = bytes::length($$rbuf); - $self->do_read($rbuf, 8192, $off) and rbuf_process($self); + my $rbuf = $self->{rbuf} // (\(my $x = '')); + $self->do_read($rbuf, 8192, bytes::length($$rbuf)) or return; + rbuf_process($self, $rbuf); } sub rbuf_process { - my ($self) = @_; + my ($self, $rbuf) = @_; + $rbuf //= $self->{rbuf} // (\(my $x = '')); my %env = %{$self->{httpd}->{env}}; # full hash copy - my $r = parse_http_request($self->{rbuf}, \%env); + my $r = parse_http_request($$rbuf, \%env); # We do not support Trailers in chunked requests, for now # (they are rarely-used and git (as of 2.7.2) does not use them) if ($r == -1 || $env{HTTP_TRAILER} || # this length-check is necessary for PURE_PERL=1: - ($r == -2 && bytes::length($self->{rbuf}) > 0x4000)) { + ($r == -2 && bytes::length($$rbuf) > 0x4000)) { return quit($self, 400); } - return $self->watch_in1 if $r < 0; # incomplete - $self->{rbuf} = substr($self->{rbuf}, $r); - + if ($r < 0) { # incomplete + $self->rbuf_idle($rbuf); + return $self->requeue; + } + $$rbuf = substr($$rbuf, $r); my $len = input_prepare($self, \%env); defined $len or return write_err($self, undef); # EMFILE/ENFILE - $len ? read_input($self) : app_dispatch($self); + $len ? read_input($self, $rbuf) : app_dispatch($self, undef, $rbuf); } # IO::Handle::write returns boolean, this returns bytes written: @@ -111,16 +119,15 @@ sub xwrite ($$$) { $w; } -sub read_input ($) { - my ($self) = @_; +sub read_input ($;$) { + my ($self, $rbuf) = @_; + $rbuf //= $self->{rbuf} // (\(my $x = '')); my $env = $self->{env}; return if $env->{REMOTE_ADDR}; # in app dispatch - return read_input_chunked($self) if env_chunked($env); + return read_input_chunked($self, $rbuf) if env_chunked($env); # env->{CONTENT_LENGTH} (identity) - my $sock = $self->{sock}; my $len = delete $self->{input_left}; - my $rbuf = \($self->{rbuf}); my $input = $env->{'psgi.input'}; while ($len > 0) { @@ -135,15 +142,15 @@ sub read_input ($) { } $$rbuf = ''; } - my $r = sysread($sock, $$rbuf, 8192); - return recv_err($self, $r, $len) unless $r; + $self->do_read($rbuf, 8192) or return recv_err($self, $len); # continue looping if $r > 0; } - app_dispatch($self, $input); + app_dispatch($self, $input, $rbuf); } sub app_dispatch { - my ($self, $input) = @_; + my ($self, $input, $rbuf) = @_; + $self->rbuf_idle($rbuf); my $env = $self->{env}; $env->{REMOTE_ADDR} = $self->{remote_addr}; $env->{REMOTE_PORT} = $self->{remote_port}; @@ -235,11 +242,12 @@ sub identity_wcb ($) { sub next_request ($) { my ($self) = @_; - if ($self->{rbuf} eq '') { # wait for next request - $self->watch_in1; - } else { # avoid recursion for pipelined requests + if ($self->{rbuf}) { + # avoid recursion for pipelined requests + PublicInbox::DS::requeue(\&process_pipelineq) if !@$pipelineq; push @$pipelineq, $self; - $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq); + } else { # wait for next request + $self->requeue; } } @@ -267,7 +275,7 @@ sub getline_cb ($$$) { if ($self->{wbuf}) { $self->write($next); } else { - PublicInbox::EvCleanup::asap($next); + PublicInbox::DS::requeue($next); } return; } @@ -360,27 +368,25 @@ sub write_err { } sub recv_err { - my ($self, $r, $len) = @_; - return $self->close if (defined $r && $r == 0); - if ($! == EAGAIN) { + my ($self, $len) = @_; + if ($! == EAGAIN) { # epoll/kevent watch already set by do_read $self->{input_left} = $len; - return $self->watch_in1; + } else { + err($self, "error reading input: $! ($len bytes remaining)"); } - err($self, "error reading for input: $! ($len bytes remaining)"); - quit($self, 500); } sub read_input_chunked { # unlikely... - my ($self) = @_; + my ($self, $rbuf) = @_; + $rbuf //= $self->{rbuf} // (\(my $x = '')); my $input = $self->{env}->{'psgi.input'}; - my $sock = $self->{sock}; my $len = delete $self->{input_left}; - my $rbuf = \($self->{rbuf}); while (1) { # chunk start if ($len == CHUNK_ZEND) { $$rbuf =~ s/\A\r\n//s and - return app_dispatch($self, $input); + return app_dispatch($self, $input, $rbuf); + return quit($self, 400) if bytes::length($$rbuf) > 2; } if ($len == CHUNK_END) { @@ -403,9 +409,8 @@ sub read_input_chunked { # unlikely... } if ($len < 0) { # chunk header is trickled, read more - my $off = bytes::length($$rbuf); - my $r = sysread($sock, $$rbuf, 8192, $off); - return recv_err($self, $r, $len) unless $r; + $self->do_read($rbuf, 8192, bytes::length($$rbuf)) or + return recv_err($self, $len); # (implicit) goto chunk_start if $r > 0; } $len = CHUNK_ZEND if $len == 0; @@ -429,8 +434,8 @@ sub read_input_chunked { # unlikely... } if ($$rbuf eq '') { # read more of current chunk - my $r = sysread($sock, $$rbuf, 8192); - return recv_err($self, $r, $len) unless $r; + $self->do_read($rbuf, 8192) or + return recv_err($self, $len); } } } @@ -459,14 +464,7 @@ sub close { # for graceful shutdown in PublicInbox::Daemon: sub busy () { my ($self) = @_; - ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf}); + ($self->{rbuf} || $self->{env} || $self->{wbuf}); } -# fires after pending writes are complete: -sub restart_pass ($) { - $_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async -} - -sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) } - 1; diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index b46baeb2..584db8d4 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -4,14 +4,15 @@ # XXX This is a totally unstable API for public-inbox internal use only # This is exposed via the 'pi-httpd.async' key in the PSGI env hash. # The name of this key is not even stable! -# Currently is is intended for use with read-only pipes. +# Currently intended for use with read-only pipes with expensive +# processes such as git-http-backend(1), cgit(1) package PublicInbox::HTTPD::Async; use strict; use warnings; use base qw(PublicInbox::DS); use fields qw(cb cleanup); -require PublicInbox::EvCleanup; use Errno qw(EAGAIN); +use PublicInbox::Syscall qw(EPOLLIN EPOLLET); sub new { my ($class, $io, $cb, $cleanup) = @_; @@ -19,47 +20,35 @@ sub new { # no $io? call $cb at the top of the next event loop to # avoid recursion: unless (defined($io)) { - PublicInbox::EvCleanup::asap($cb) if $cb; - PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup; + PublicInbox::DS::requeue($cb); + die 'cleanup unsupported w/o $io' if $cleanup; return; } my $self = fields::new($class); IO::Handle::blocking($io, 0); - $self->SUPER::new($io, PublicInbox::DS::EPOLLIN()); + $self->SUPER::new($io, EPOLLIN | EPOLLET); $self->{cb} = $cb; $self->{cleanup} = $cleanup; $self; } -sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) } - -sub main_cb ($$$) { - my ($http, $fh, $bref) = @_; +sub main_cb ($$) { + my ($http, $fh) = @_; sub { my ($self) = @_; - my $r = sysread($self->{sock}, $$bref, 8192); + my $r = sysread($self->{sock}, my $buf, 65536); if ($r) { - $fh->write($$bref); # may call $http->close - + $fh->write($buf); # may call $http->close if ($http->{sock}) { # !closed - if ($http->{wbuf}) { - # HTTP client could not keep up, so - # stop reading and buffering. - $self->watch(0); - - # Tell the HTTP socket to restart us - # when HTTP client is done draining - # $http->{wbuf}: - $http->enqueue_restart_pass; - } - # stay in EPOLLIN, but let other clients - # get some work done, too. + $self->requeue; + # let other clients get some work done, too return; } - # fall through to close below... - } elsif (!defined $r) { - return restart_read($self) if $! == EAGAIN; + + # else: fall through to close below... + } elsif (!defined $r && $! == EAGAIN) { + return; # EPOLLET means we'll be notified } # Done! Error handling will happen in $fh->close @@ -75,10 +64,16 @@ sub async_pass { # will automatically close this ($self) object. $http->{forward} = $self; $fh->write($$bref); # PublicInbox:HTTP::{chunked,identity}_wcb - $self->{cb} = main_cb($http, $fh, $bref); + $$bref = undef; # we're done with this + my $cb = $self->{cb} = main_cb($http, $fh); + $cb->($self); # either hit EAGAIN or ->requeue to keep EPOLLET happy } -sub event_step { $_[0]->{cb}->(@_) } +sub event_step { + # {cb} may be undef after ->requeue due to $http->close happening + my $cb = $_[0]->{cb} or return; + $cb->(@_); +} sub close { my $self = $_[0]; @@ -87,7 +82,7 @@ sub close { # we defer this to the next timer loop since close is deferred if (my $cleanup = delete $self->{cleanup}) { - PublicInbox::EvCleanup::next_tick($cleanup); + PublicInbox::DS::requeue($cleanup); } } diff --git a/lib/PublicInbox/Listener.pm b/lib/PublicInbox/Listener.pm index 94b2aed4..594dabb8 100644 --- a/lib/PublicInbox/Listener.pm +++ b/lib/PublicInbox/Listener.pm @@ -9,6 +9,7 @@ use base 'PublicInbox::DS'; use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY); use fields qw(post_accept); require IO::Handle; +use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET); sub new ($$$) { my ($class, $s, $cb) = @_; @@ -17,15 +18,14 @@ sub new ($$$) { listen($s, 1024); IO::Handle::blocking($s, 0); my $self = fields::new($class); - $self->SUPER::new($s, PublicInbox::DS::EPOLLIN()| - PublicInbox::DS::EPOLLEXCLUSIVE()); + $self->SUPER::new($s, EPOLLIN|EPOLLET|EPOLLEXCLUSIVE); $self->{post_accept} = $cb; $self } sub event_step { my ($self) = @_; - my $sock = $self->{sock}; + my $sock = $self->{sock} or return; # no loop here, we want to fairly distribute clients # between multiple processes sharing the same socket @@ -35,6 +35,7 @@ sub event_step { if (my $addr = accept(my $c, $sock)) { IO::Handle::blocking($c, 0); # no accept4 :< $self->{post_accept}->($c, $addr, $sock); + $self->requeue; } } diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 5a886a3c..26bc679f 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -6,7 +6,7 @@ package PublicInbox::NNTP; use strict; use warnings; use base qw(PublicInbox::DS); -use fields qw(nntpd article rbuf ng); +use fields qw(nntpd article ng); use PublicInbox::Search; use PublicInbox::Msgmap; use PublicInbox::MID qw(mid_escape); @@ -38,20 +38,6 @@ my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr); my $EXPMAP; # fd -> [ idle_time, $self ] my $expt; our $EXPTIME = 180; # 3 minutes -my $nextt; - -my $nextq = []; -sub next_tick () { - $nextt = undef; - my $q = $nextq; - $nextq = []; - event_step($_) for @$q; -} - -sub requeue ($) { - push @$nextq, $_[0]; - $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); -} sub update_idle_time ($) { my ($self) = @_; @@ -64,14 +50,11 @@ sub expire_old () { my $exp = $EXPTIME; my $old = $now - $exp; my $nr = 0; - my $closed = 0; my %new; while (my ($fd, $v) = each %$EXPMAP) { my ($idle_time, $nntp) = @$v; if ($idle_time < $old) { - if ($nntp->shutdn) { - $closed++; - } else { + if (!$nntp->shutdn) { ++$nr; $new{$fd} = $v; } @@ -81,14 +64,7 @@ sub expire_old () { } } $EXPMAP = \%new; - if ($nr) { - $expt = PublicInbox::EvCleanup::later(*expire_old); - } else { - $expt = undef; - # noop to kick outselves out of the loop ASAP so descriptors - # really get closed - PublicInbox::EvCleanup::asap(sub {}) if $closed; - } + $expt = PublicInbox::EvCleanup::later(*expire_old) if $nr; } sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) }; @@ -99,7 +75,8 @@ sub new ($$$) { my $ev = EPOLLIN; my $wbuf; if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) { - $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock); + return CORE::close($sock) if $! != EAGAIN; + $ev = PublicInbox::TLS::epollbit(); $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ]; } $self->SUPER::new($sock, $ev | EPOLLONESHOT); @@ -654,12 +631,12 @@ sub long_response ($$) { push @$wbuf, $long_cb; # wbuf may be populated by $cb, no need to rearm if so: - requeue($self) if scalar(@$wbuf) == 1; + $self->requeue if scalar(@$wbuf) == 1; } else { # all done! $long_cb = undef; res($self, '.'); out($self, " deferred[$fd] done - %0.6f", now() - $t0); - requeue($self) unless $self->{wbuf}; + $self->requeue unless $self->{wbuf}; } }; $self->write($long_cb); # kick off! @@ -914,7 +891,7 @@ sub cmd_starttls ($) { return '580 can not initiate TLS negotiation'; res($self, '382 Continue with TLS negotiation'); $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt); - requeue($self) if PublicInbox::DS::accept_tls_step($self); + $self->requeue if PublicInbox::DS::accept_tls_step($self); undef; } @@ -984,16 +961,12 @@ sub event_step { return $self->close if $r < 0; my $len = bytes::length($$rbuf); return $self->close if ($len >= LINE_MAX); - if ($len) { - $self->{rbuf} = $rbuf; - } else { - delete $self->{rbuf}; - } + $self->rbuf_idle($rbuf); update_idle_time($self); # maybe there's more pipelined data, or we'll have # to register it for socket-readiness notifications - requeue($self) unless $self->{wbuf}; + $self->requeue unless $self->{wbuf}; } sub not_idle_long ($$) { diff --git a/lib/PublicInbox/ParentPipe.pm b/lib/PublicInbox/ParentPipe.pm index ccc0815e..2e2abb5f 100644 --- a/lib/PublicInbox/ParentPipe.pm +++ b/lib/PublicInbox/ParentPipe.pm @@ -1,20 +1,27 @@ # Copyright (C) 2016-2018 all contributors # License: AGPL-3.0+ -# only for PublicInbox::Daemon + +# only for PublicInbox::Daemon, allows worker processes to be +# notified if the master process dies. package PublicInbox::ParentPipe; use strict; use warnings; use base qw(PublicInbox::DS); use fields qw(cb); +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); sub new ($$$) { - my ($class, $pipe, $cb) = @_; + my ($class, $pipe, $worker_quit) = @_; my $self = fields::new($class); - $self->SUPER::new($pipe, PublicInbox::DS::EPOLLIN()); - $self->{cb} = $cb; + $self->SUPER::new($pipe, EPOLLIN|EPOLLONESHOT); + $self->{cb} = $worker_quit; $self; } -sub event_step { $_[0]->{cb}->($_[0]) } +# master process died, time to call worker_quit ourselves +sub event_step { + $_[0]->close; # PublicInbox::DS::close + $_[0]->{cb}->(); +} 1; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index f2630a0f..8f0b9fe2 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -128,7 +128,7 @@ sub psgi_qx { my $rpipe; # comes from popen_rd my $async = $env->{'pi-httpd.async'}; my $cb = sub { - my $r = sysread($rpipe, my $buf, 8192); + my $r = sysread($rpipe, my $buf, 65536); if ($async) { $async->async_pass($env->{'psgix.io'}, $qx, \$buf); } elsif (defined $r) { diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index 500efa67..d7e15c72 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -22,7 +22,7 @@ use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS $VERSION); $VERSION = "0.25"; @ISA = qw(Exporter); @EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait - EPOLLIN EPOLLOUT + EPOLLIN EPOLLOUT EPOLLET EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD EPOLLONESHOT EPOLLEXCLUSIVE); %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait @@ -39,7 +39,7 @@ use constant EPOLLOUT => 4; # use constant EPOLLRDBAND => 128; use constant EPOLLEXCLUSIVE => (1 << 28); use constant EPOLLONESHOT => (1 << 30); -# use constant EPOLLET => (1 << 31); +use constant EPOLLET => (1 << 31); use constant EPOLL_CTL_ADD => 1; use constant EPOLL_CTL_DEL => 2; use constant EPOLL_CTL_MOD => 3; diff --git a/lib/PublicInbox/TLS.pm b/lib/PublicInbox/TLS.pm index 576c11d7..0b9a55df 100644 --- a/lib/PublicInbox/TLS.pm +++ b/lib/PublicInbox/TLS.pm @@ -13,12 +13,9 @@ sub err () { $SSL_ERROR } # returns the EPOLL event bit which matches the existing SSL error sub epollbit () { - if ($! == EAGAIN) { - return EPOLLIN if $SSL_ERROR == SSL_WANT_READ; - return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE; - die "unexpected SSL error: $SSL_ERROR"; - } - 0; + return EPOLLIN if $SSL_ERROR == SSL_WANT_READ; + return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE; + die "unexpected SSL error: $SSL_ERROR"; } 1; diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index 2d4c6f43..a76bf06e 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -150,6 +150,12 @@ sub _try_path { if (!ref($inboxes) && $inboxes eq 'watchspam') { return _remove_spam($self, $path); } + + my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; + local $SIG{__WARN__} = sub { + $warn_cb->("path: $path\n"); + $warn_cb->(@_); + }; foreach my $ibx (@$inboxes) { my $mime = _path_to_mime($path) or next; my $im = _importer_for($self, $ibx); diff --git a/t/ds-leak.t b/t/ds-leak.t index dd8446dc..34ffc125 100644 --- a/t/ds-leak.t +++ b/t/ds-leak.t @@ -15,9 +15,21 @@ if ('close-on-exec for epoll and kqueue') { PublicInbox::DS->SetLoopTimeout(0); PublicInbox::DS->SetPostLoopCallback(sub { 0 }); + + # make sure execve closes if we're using fork() + my ($r, $w); + pipe($r, $w) or die "pipe: $!"; + PublicInbox::DS->AddTimer(0, sub { $pid = spawn([qw(sleep 10)]) }); PublicInbox::DS->EventLoop; ok($pid, 'subprocess spawned'); + + # wait for execve, we need to ensure lsof sees sleep(1) + # and not the fork of this process: + close $w or die "close: $!"; + my $l = <$r>; + is($l, undef, 'cloexec works and sleep(1) is running'); + my @of = grep(/$evfd_re/, `lsof -p $pid 2>/dev/null`); my $err = $?; SKIP: { diff --git a/t/httpd-https.t b/t/httpd-https.t new file mode 100644 index 00000000..f6b9806a --- /dev/null +++ b/t/httpd-https.t @@ -0,0 +1,141 @@ +# Copyright (C) 2019 all contributors +# License: AGPL-3.0+ +use strict; +use warnings; +use Test::More; +use File::Temp qw(tempdir); +use Socket qw(SOCK_STREAM IPPROTO_TCP SOL_SOCKET); +# IO::Poll is part of the standard library, but distros may split them off... +foreach my $mod (qw(IO::Socket::SSL IO::Poll)) { + eval "require $mod"; + plan skip_all => "$mod missing for $0" if $@; +} +my $cert = 'certs/server-cert.pem'; +my $key = 'certs/server-key.pem'; +unless (-r $key && -r $cert) { + plan skip_all => + "certs/ missing for $0, run ./create-certs.perl in certs/"; +} +use_ok 'PublicInbox::TLS'; +use_ok 'IO::Socket::SSL'; +require './t/common.perl'; +my $psgi = "./t/httpd-corner.psgi"; +my $tmpdir = tempdir('pi-httpd-https-XXXXXX', TMPDIR => 1, CLEANUP => 1); +my $err = "$tmpdir/stderr.log"; +my $out = "$tmpdir/stdout.log"; +my $httpd = 'blib/script/public-inbox-httpd'; +my %opts = ( + LocalAddr => '127.0.0.1', + ReuseAddr => 1, + Proto => 'tcp', + Type => SOCK_STREAM, + Listen => 1024, +); +my $https = IO::Socket::INET->new(%opts); +my ($pid, $tail_pid); +END { + foreach ($pid, $tail_pid) { + kill 'TERM', $_ if defined $_; + } +}; +my $https_addr = $https->sockhost . ':' . $https->sockport; +my %opt = ( Proto => 'tcp', PeerAddr => $https_addr, Type => SOCK_STREAM ); + +for my $args ( + [ "-lhttps://$https_addr/?key=$key,cert=$cert" ], +) { + for ($out, $err) { + open my $fh, '>', $_ or die "truncate: $!"; + } + if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail + $tail_pid = fork; + if (defined $tail_pid && $tail_pid == 0) { + exec(split(' ', $tail_cmd), $out, $err); + } + } + my $cmd = [ $httpd, '-W0', @$args, + "--stdout=$out", "--stderr=$err", $psgi ]; + $pid = spawn_listener(undef, $cmd, [ $https ]); + my %o = ( + SSL_hostname => 'server.local', + SSL_verifycn_name => 'server.local', + SSL_verify_mode => SSL_VERIFY_PEER(), + SSL_ca_file => 'certs/test-ca.pem', + ); + # start negotiating a slow TLS connection + my $slow = IO::Socket::INET->new(%opt, Blocking => 0); + $slow = IO::Socket::SSL->start_SSL($slow, SSL_startHandshake => 0, %o); + my @poll = (fileno($slow)); + my $slow_done = $slow->connect_SSL; + if ($slow_done) { + diag('W: connect_SSL early OK, slow client test invalid'); + push @poll, PublicInbox::Syscall::EPOLLOUT(); + } else { + push @poll, PublicInbox::TLS::epollbit(); + } + + # normal HTTPS + my $c = IO::Socket::INET->new(%opt); + IO::Socket::SSL->start_SSL($c, %o); + ok($c->print("GET /empty HTTP/1.1\r\n\r\nHost: example.com\r\n\r\n"), + 'wrote HTTP request'); + my $buf = ''; + sysread($c, $buf, 2007, length($buf)) until $buf =~ /\r\n\r\n/; + like($buf, qr!\AHTTP/1\.1 200!, 'read HTTP response'); + + # HTTPS with bad hostname + $c = IO::Socket::INET->new(%opt); + $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.fail'; + $c = IO::Socket::SSL->start_SSL($c, %o); + is($c, undef, 'HTTPS fails with bad hostname'); + + $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.local'; + $c = IO::Socket::INET->new(%opt); + IO::Socket::SSL->start_SSL($c, %o); + ok($c, 'HTTPS succeeds again with valid hostname'); + + # slow TLS connection did not block the other fast clients while + # connecting, finish it off: + until ($slow_done) { + IO::Poll::_poll(-1, @poll); + $slow_done = $slow->connect_SSL and last; + @poll = (fileno($slow), PublicInbox::TLS::epollbit()); + } + $slow->blocking(1); + ok($slow->print("GET /empty HTTP/1.1\r\n\r\nHost: example.com\r\n\r\n"), + 'wrote HTTP request from slow'); + $buf = ''; + sysread($slow, $buf, 666, length($buf)) until $buf =~ /\r\n\r\n/; + like($buf, qr!\AHTTP/1\.1 200!, 'read HTTP response from slow'); + $slow = undef; + + SKIP: { + skip 'TCP_DEFER_ACCEPT is Linux-only', 2 if $^O ne 'linux'; + my $var = Socket::TCP_DEFER_ACCEPT(); + defined(my $x = getsockopt($https, IPPROTO_TCP, $var)) or die; + ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set on https'); + }; + SKIP: { + skip 'SO_ACCEPTFILTER is FreeBSD-only', 2 if $^O ne 'freebsd'; + if (system('kldstat -m accf_data >/dev/null')) { + skip 'accf_data not loaded? kldload accf_data', 2; + } + require PublicInbox::Daemon; + my $var = PublicInbox::Daemon::SO_ACCEPTFILTER(); + my $x = getsockopt($https, SOL_SOCKET, $var); + like($x, qr/\Adataready\0+\z/, 'got dataready accf for https'); + }; + + $c = undef; + kill('TERM', $pid); + is($pid, waitpid($pid, 0), 'httpd exited successfully'); + is($?, 0, 'no error in exited process'); + $pid = undef; + if (defined $tail_pid) { + kill 'TERM', $tail_pid; + waitpid($tail_pid, 0); + $tail_pid = undef; + } +} +done_testing(); +1; diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t index 427d370f..e3ecdd4f 100644 --- a/t/nntpd-tls.t +++ b/t/nntpd-tls.t @@ -11,6 +11,9 @@ foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) { eval "require $mod"; plan skip_all => "$mod missing for $0" if $@; } +Net::NNTP->can('starttls') or + plan skip_all => 'Net::NNTP does not support TLS'; + my $cert = 'certs/server-cert.pem'; my $key = 'certs/server-key.pem'; unless (-r $key && -r $cert) { diff --git a/t/nntpd.t b/t/nntpd.t index 6cba2be4..bf5bb883 100644 --- a/t/nntpd.t +++ b/t/nntpd.t @@ -106,8 +106,13 @@ EOF is_deeply($list, { $group => [ qw(1 1 n) ] }, 'LIST works'); is_deeply([$n->group($group)], [ qw(0 1 1), $group ], 'GROUP works'); is_deeply($n->listgroup($group), [1], 'listgroup OK'); - ok(!$n->starttls, 'STARTTLS fails when unconfigured'); - is($n->code, 580, 'got 580 code on server w/o TLS'); + + SKIP: { + $n->can('starttls') or + skip('Net::NNTP too old to support STARTTLS', 2); + ok(!$n->starttls, 'STARTTLS fails when unconfigured'); + is($n->code, 580, 'got 580 code on server w/o TLS'); + }; %opts = ( PeerAddr => $host_port,