X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FQspawn.pm;h=63ec3648c933bdb7775a51a2fe90e3cfc235231c;hb=95bdac7f09c69036efed537a4d03d5bdd2ae4eb6;hp=f2630a0f0109a624690edf38b849423b9b0af8dd;hpb=84d8920b92686e975929aebe845b6d4ea0a9ef0d;p=public-inbox.git diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index f2630a0f..63ec3648 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2019 all contributors +# Copyright (C) 2016-2020 all contributors # License: AGPL-3.0+ # Like most Perl modules in public-inbox, this is internal and @@ -27,7 +27,6 @@ package PublicInbox::Qspawn; use strict; use warnings; use PublicInbox::Spawn qw(popen_rd); -require Plack::Util; # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers use Errno qw(EAGAIN EINTR); @@ -36,32 +35,38 @@ my $def_limiter; # declares a command to spawn (but does not spawn it). # $cmd is the command to spawn -# $env is the environ for the child process +# $cmd_env is the environ for the child process (not PSGI env) # $opt can include redirects and perhaps other process spawning options sub new ($$$;) { - my ($class, $cmd, $env, $opt) = @_; - bless { args => [ $cmd, $env, $opt ] }, $class; + my ($class, $cmd, $cmd_env, $opt) = @_; + bless { args => [ $cmd, $cmd_env, $opt ] }, $class; } sub _do_spawn { - my ($self, $cb) = @_; + my ($self, $start_cb, $limiter) = @_; my $err; - my ($cmd, $env, $opts) = @{$self->{args}}; - my %opts = %{$opts || {}}; - my $limiter = $self->{limiter}; + my ($cmd, $cmd_env, $opt) = @{$self->{args}}; + my %o = %{$opt || {}}; + $self->{limiter} = $limiter; foreach my $k (PublicInbox::Spawn::RLIMITS()) { if (defined(my $rlimit = $limiter->{$k})) { - $opts{$k} = $rlimit; + $o{$k} = $rlimit; } } + eval { + # popen_rd may die on EMFILE, ENFILE + ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $cmd_env, \%o); + $self->{args} = $o{quiet} ? undef : $cmd; + + die "E: $!" unless defined($self->{pid}); - ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $env, \%opts); - if (defined $self->{pid}) { $limiter->{running}++; - } else { - $self->{err} = $!; + $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM + }; + if ($@) { + $self->{err} = $@; + finish($self); } - $cb->($self->{rpipe}); } sub child_err ($) { @@ -73,43 +78,109 @@ sub child_err ($) { $msg; } -sub finish ($) { - my ($self) = @_; +sub log_err ($$) { + my ($env, $msg) = @_; + $env->{'psgi.errors'}->print($msg, "\n"); +} + +# callback for dwaitpid +sub waitpid_err ($$) { + my ($self, $pid) = @_; + my $xpid = delete $self->{pid}; + my $err; + if ($pid > 0) { # success! + $err = child_err($?); + } elsif ($pid < 0) { # ??? does this happen in our case? + $err = "W: waitpid($xpid, 0) => $pid: $!"; + } # else should not be called with pid == 0 + + my ($env, $qx_cb, $qx_arg, $qx_buf) = + delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; + + # done, spawn whatever's in the queue my $limiter = $self->{limiter}; - my $running; - if (delete $self->{rpipe}) { - my $pid = delete $self->{pid}; - $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) : - "PID:$pid still running?"; - $running = --$limiter->{running}; + my $running = --$limiter->{running}; + + if ($running < $limiter->{max}) { + if (my $next = shift(@{$limiter->{run_queue}})) { + _do_spawn(@$next, $limiter); + } } - # limiter->{max} may change dynamically - if (($running || $limiter->{running}) < $limiter->{max}) { - if (my $next = shift @{$limiter->{run_queue}}) { - _do_spawn(@$next); + if ($err) { + if ($self->{err}) { + $self->{err} .= "; $err"; + } else { + $self->{err} = $err; + } + if ($env && $self->{args}) { + log_err($env, join(' ', @{$self->{args}}) . ": $err"); } } - $self->{err}; + eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb; } -sub start { - my ($self, $limiter, $cb) = @_; - $self->{limiter} = $limiter; +sub do_waitpid ($) { + my ($self) = @_; + my $pid = $self->{pid}; + # PublicInbox::DS may not be loaded + eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) }; + # done if we're running in PublicInbox::DS::EventLoop + if ($@) { + # non public-inbox-{httpd,nntpd} callers may block: + my $ret = waitpid($pid, 0); + waitpid_err($self, $ret); + } +} + +sub finish ($) { + my ($self) = @_; + if (delete $self->{rpipe}) { + do_waitpid($self); + } else { + my ($env, $qx_cb, $qx_arg, $qx_buf) = + delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; + eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb; + } +} +sub start ($$$) { + my ($self, $limiter, $start_cb) = @_; if ($limiter->{running} < $limiter->{max}) { - _do_spawn($self, $cb); + _do_spawn($self, $start_cb, $limiter); } else { - push @{$limiter->{run_queue}}, [ $self, $cb ]; + push @{$limiter->{run_queue}}, [ $self, $start_cb ]; } } -sub _psgi_finish ($$) { - my ($self, $env) = @_; - my $err = $self->finish; - if ($err && !$env->{'qspawn.quiet'}) { - $err = join(' ', @{$self->{args}->[0]}).": $err\n"; - $env->{'psgi.errors'}->print($err); +sub psgi_qx_init_cb { + my ($self) = @_; + my $async = delete $self->{async}; + my ($r, $buf); + my $qx_fh = $self->{qx_fh}; +reread: + $r = sysread($self->{rpipe}, $buf, 65536); + if ($async) { + $async->async_pass($self->{psgi_env}->{'psgix.io'}, + $qx_fh, \$buf); + } elsif (defined $r) { + $r ? (print $qx_fh $buf) : event_step($self, undef); + } else { + return if $! == EAGAIN; # try again when notified + goto reread if $! == EINTR; + event_step($self, $!); + } +} + +sub psgi_qx_start { + my ($self) = @_; + if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) { + # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj) + $self->{async} = $async->($self->{rpipe}, + \&psgi_qx_init_cb, $self, $self); + # init_cb will call ->async_pass or ->close + } else { # generic PSGI + psgi_qx_init_cb($self) while $self->{qx_fh}; } } @@ -118,49 +189,99 @@ sub _psgi_finish ($$) { # $env is the PSGI env. As with ``/qx; only use this when output is small # and safe to slurp. sub psgi_qx { - my ($self, $env, $limiter, $qx_cb) = @_; - my $qx = PublicInbox::Qspawn::Qx->new; - my $end = sub { - _psgi_finish($self, $env); - eval { $qx_cb->($qx) }; - $qx = undef; - }; - my $rpipe; # comes from popen_rd - my $async = $env->{'pi-httpd.async'}; - my $cb = sub { - my $r = sysread($rpipe, my $buf, 8192); - if ($async) { - $async->async_pass($env->{'psgix.io'}, $qx, \$buf); - } elsif (defined $r) { - $r ? $qx->write($buf) : $end->(); + my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_; + $self->{psgi_env} = $env; + my $qx_buf = ''; + open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar + $self->{qx_cb} = $qx_cb; + $self->{qx_arg} = $qx_arg; + $self->{qx_fh} = $qx_fh; + $self->{qx_buf} = \$qx_buf; + $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + start($self, $limiter, \&psgi_qx_start); +} + +# this is called on pipe EOF to reap the process, may be called +# via PublicInbox::DS event loop OR via GetlineBody for generic +# PSGI servers. +sub event_step { + my ($self, $err) = @_; # $err: $! + log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err); + finish($self); + my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)}); + $fh->close if $fh; # async-only (psgi_return) +} + +sub rd_hdr ($) { + my ($self) = @_; + # typically used for reading CGI headers + # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm + # We also need to check EINTR for generic PSGI servers. + my $ret; + my $total_rd = 0; + my $hdr_buf = $self->{hdr_buf}; + my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}}; + do { + my $r = sysread($self->{rpipe}, $$hdr_buf, 4096, + length($$hdr_buf)); + if (defined($r)) { + $total_rd += $r; + $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg); } else { - return if $! == EAGAIN || $! == EINTR; # loop again - $end->(); + # caller should notify us when it's ready: + return if $! == EAGAIN; + next if $! == EINTR; # immediate retry + log_err($self->{psgi_env}, "error reading header: $!"); + $ret = [ 500, [], [ "Internal error\n" ] ]; } - }; - $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); - $self->start($limiter, sub { # may run later, much later... - ($rpipe) = @_; # popen_rd result + } until (defined $ret); + delete $self->{parse_hdr}; # done parsing headers + $ret; +} + +sub psgi_return_init_cb { + my ($self) = @_; + my $r = rd_hdr($self) or return; + my $env = $self->{psgi_env}; + my $wcb = delete $env->{'qspawn.wcb'}; + my $async = delete $self->{async}; + if (scalar(@$r) == 3) { # error if ($async) { - # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) - $async = $async->($rpipe, $cb, $end); - } else { # generic PSGI - $cb->() while $qx; + # calls rpipe->close && ->event_step + $async->close; + } else { + $self->{rpipe}->close; + event_step($self); } - }); + $wcb->($r); + } elsif ($async) { + # done reading headers, handoff to read body + my $fh = $wcb->($r); # scalar @$r == 2 + $self->{fh} = $fh; + $async->async_pass($env->{'psgix.io'}, $fh, + delete($self->{hdr_buf})); + } else { # for synchronous PSGI servers + require PublicInbox::GetlineBody; + $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, + \&event_step, $self, + ${$self->{hdr_buf}}); + $wcb->($r); + } + + # Workaround a leak under Perl 5.16.3 when combined with + # Plack::Middleware::Deflater: + $wcb = undef; } -# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async -sub filter_fh ($$) { - my ($fh, $filter) = @_; - Plack::Util::inline_object( - close => sub { - $fh->write($filter->(undef)); - $fh->close; - }, - write => sub { - $fh->write($filter->($_[0])); - }); +sub psgi_return_start { # may run later, much later... + my ($self) = @_; + if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) { + # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj) + $self->{async} = $async->($self->{rpipe}, + \&psgi_return_init_cb, $self, $self); + } else { # generic PSGI + psgi_return_init_cb($self) while $self->{parse_hdr}; + } } # Used for streaming the stdout of one process as a PSGI response. @@ -173,9 +294,6 @@ sub filter_fh ($$) { # psgi_return will return an anonymous # sub for the PSGI server to call # -# $env->{'qspawn.filter'} - filter callback, receives a string as input, -# undef on EOF -# # $limiter - the Limiter object to use (uses the def_limiter if not given) # # $parse_hdr - Initial read function; often for parsing CGI header output. @@ -186,69 +304,25 @@ sub filter_fh ($$) { # psgix.io. 3-element arrays means the body is available # immediately (or streamed via ->getline (pull-based)). sub psgi_return { - my ($self, $env, $limiter, $parse_hdr) = @_; - my ($fh, $rpipe); - my $end = sub { - _psgi_finish($self, $env); - $fh->close if $fh; # async-only - }; - - my $buf = ''; - my $rd_hdr = sub { - my $r = sysread($rpipe, $buf, 1024, length($buf)); - return if !defined($r) && $! == EAGAIN || $! == EINTR; - $parse_hdr->($r, \$buf); - }; - - my $wcb = delete $env->{'qspawn.wcb'}; - my $async = $env->{'pi-httpd.async'}; - - my $cb = sub { - my $r = $rd_hdr->() or return; - $rd_hdr = undef; - my $filter = delete $env->{'qspawn.filter'}; - if (scalar(@$r) == 3) { # error - if ($async) { - $async->close; # calls rpipe->close and $end - } else { - $rpipe->close; - $end->(); - } - $wcb->($r); - } elsif ($async) { - $fh = $wcb->($r); # scalar @$r == 2 - $fh = filter_fh($fh, $filter) if $filter; - $async->async_pass($env->{'psgix.io'}, $fh, \$buf); - } else { # for synchronous PSGI servers - require PublicInbox::GetlineBody; - $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, - $buf, $filter); - $wcb->($r); - } - }; + my ($self, $env, $limiter, $parse_hdr, $hdr_arg) = @_; + $self->{psgi_env} = $env; + $self->{hdr_buf} = \(my $hdr_buf = ''); + $self->{parse_hdr} = [ $parse_hdr, $hdr_arg ]; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); - my $start_cb = sub { # may run later, much later... - ($rpipe) = @_; - if ($async) { - # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) - $async = $async->($rpipe, $cb, $end); - } else { # generic PSGI - $cb->() while $rd_hdr; - } - }; # the caller already captured the PSGI write callback from # the PSGI server, so we can call ->start, here: - return $self->start($limiter, $start_cb) if $wcb; + $env->{'qspawn.wcb'} and + return start($self, $limiter, \&psgi_return_start); # the caller will return this sub to the PSGI server, so - # it can set the response callback (that is, for PublicInbox::HTTP, - # the chunked_wcb or identity_wcb callback), but other HTTP servers - # are supported: + # it can set the response callback (that is, for + # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback), + # but other HTTP servers are supported: sub { - ($wcb) = @_; - $self->start($limiter, $start_cb); - }; + $env->{'qspawn.wcb'} = $_[0]; + start($self, $limiter, \&psgi_return_start); + } } package PublicInbox::Qspawn::Limiter; @@ -294,21 +368,4 @@ sub setup_rlimit { } } -# captures everything into a buffer and executes a callback when done -package PublicInbox::Qspawn::Qx; -use strict; -use warnings; - -sub new { - my ($class) = @_; - my $buf = ''; - bless \$buf, $class; -} - -# called by PublicInbox::HTTPD::Async ($fh->write) -sub write { - ${$_[0]} .= $_[1]; - undef; -} - 1;