X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FQspawn.pm;h=53d0ad55ee84d47ccc10bd2d7516136fd38207e8;hb=23af251dd607c4e75ab1e68063f2c885c48cc035;hp=1985dccd972fb24aca25578a9c3720a8202fc3a8;hpb=2131a3cd0a1cad6f7d6a2c6439676f9f2a039ff7;p=public-inbox.git diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 1985dccd..53d0ad55 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2019 all contributors +# Copyright (C) 2016-2021 all contributors # License: AGPL-3.0+ # Like most Perl modules in public-inbox, this is internal and @@ -12,12 +12,13 @@ # operate in. This can be useful to ensure smaller inboxes can # be cloned while cloning of large inboxes is maxed out. # -# This does not depend on PublicInbox::DS or any other external -# scheduling mechanism, you just need to call start() and finish() -# appropriately. However, public-inbox-httpd (which uses PublicInbox::DS) -# will be able to schedule this based on readability of stdout from -# the spawned process. See GitHTTPBackend.pm and SolverGit.pm for -# usage examples. It does not depend on any form of threading. +# This does not depend on the PublicInbox::DS::event_loop or any +# other external scheduling mechanism, you just need to call +# start() and finish() appropriately. However, public-inbox-httpd +# (which uses PublicInbox::DS) will be able to schedule this +# based on readability of stdout from the spawned process. +# See GitHTTPBackend.pm and SolverGit.pm for usage examples. +# It does not depend on any form of threading. # # This is useful for scheduling CGI execution of both long-lived # git-http-backend(1) process (for "git clone") as well as short-lived @@ -25,9 +26,8 @@ package PublicInbox::Qspawn; use strict; -use warnings; use PublicInbox::Spawn qw(popen_rd); -require Plack::Util; +use PublicInbox::GzipFilter; # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers use Errno qw(EAGAIN EINTR); @@ -46,27 +46,25 @@ sub new ($$$;) { sub _do_spawn { my ($self, $start_cb, $limiter) = @_; my $err; - my ($cmd, $cmd_env, $opts) = @{$self->{args}}; - my %opts = %{$opts || {}}; + my ($cmd, $cmd_env, $opt) = @{delete $self->{args}}; + my %o = %{$opt || {}}; $self->{limiter} = $limiter; - foreach my $k (PublicInbox::Spawn::RLIMITS()) { + foreach my $k (@PublicInbox::Spawn::RLIMITS) { if (defined(my $rlimit = $limiter->{$k})) { - $opts{$k} = $rlimit; + $o{$k} = $rlimit; } } + $self->{cmd} = $o{quiet} ? undef : $cmd; + eval { + # popen_rd may die on EMFILE, ENFILE + $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o); - ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $cmd_env, \%opts); - - # drop any IO handles opt was holding open via $opt->{hold} - # No need to hold onto the descriptor once the child process has it. - $self->{args} = $cmd; # keep this around for logging + die "E: $!" unless defined($self->{rpipe}); - if (defined $self->{pid}) { $limiter->{running}++; - } else { - $self->{err} = $!; - } - $start_cb->($self->{rpipe}); + $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM + }; + finish($self, $@) if $@; } sub child_err ($) { @@ -78,21 +76,8 @@ sub child_err ($) { $msg; } -sub log_err ($$) { - my ($env, $msg) = @_; - $env->{'psgi.errors'}->print($msg, "\n"); -} - -# callback for dwaitpid -sub waitpid_err ($$) { - my ($self, $pid) = @_; - my $xpid = delete $self->{pid}; - my $err; - if ($pid > 0) { # success! - $err = child_err($?); - } elsif ($pid < 0) { # ??? does this happen in our case? - $err = "W: waitpid($xpid, 0) => $pid: $!"; - } # else should not be called with pid == 0 +sub finalize ($$) { + my ($self, $err) = @_; my ($env, $qx_cb, $qx_arg, $qx_buf) = delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; @@ -108,39 +93,35 @@ sub waitpid_err ($$) { } if ($err) { - $self->{err} = $err; - if ($env && !$env->{'qspawn.quiet'}) { - log_err($env, join(' ', @{$self->{args}}) . ": $err"); + if (defined $self->{err}) { + $self->{err} .= "; $err"; + } else { + $self->{err} = $err; + } + if ($env && $self->{cmd}) { + warn join(' ', @{$self->{cmd}}) . ": $err"; } } - eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb; -} - -sub do_waitpid ($) { - my ($self) = @_; - my $pid = $self->{pid}; - # PublicInbox::DS may not be loaded - eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) }; - # done if we're running in PublicInbox::DS::EventLoop - if ($@) { - # non public-inbox-{httpd,nntpd} callers may block: - my $ret = waitpid($pid, 0); - waitpid_err($self, $ret); + if ($qx_cb) { + eval { $qx_cb->($qx_buf, $qx_arg) }; + } elsif (my $wcb = delete $env->{'qspawn.wcb'}) { + # have we started writing, yet? + require PublicInbox::WwwStatic; + $wcb->(PublicInbox::WwwStatic::r(500)); } } -sub finish ($) { - my ($self) = @_; - if (delete $self->{rpipe}) { - do_waitpid($self); - } else { - my ($env, $qx_cb, $qx_arg, $qx_buf) = - delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; - eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb; - } +# callback for dwaitpid or ProcessPipe +sub waitpid_err { finalize($_[0], child_err($?)) } + +sub finish ($;$) { + my ($self, $err) = @_; + my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err); + my PublicInbox::ProcessPipe $pp = tied *$tied_pp; + @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY } -sub start { +sub start ($$$) { my ($self, $limiter, $start_cb) = @_; if ($limiter->{running} < $limiter->{max}) { _do_spawn($self, $start_cb, $limiter); @@ -149,6 +130,37 @@ sub start { } } +sub psgi_qx_init_cb { + my ($self) = @_; + my $async = delete $self->{async}; + my ($r, $buf); + my $qx_fh = $self->{qx_fh}; +reread: + $r = sysread($self->{rpipe}, $buf, 65536); + if ($async) { + $async->async_pass($self->{psgi_env}->{'psgix.io'}, + $qx_fh, \$buf); + } elsif (defined $r) { + $r ? (print $qx_fh $buf) : event_step($self, undef); + } else { + return if $! == EAGAIN; # try again when notified + goto reread if $! == EINTR; + event_step($self, $!); + } +} + +sub psgi_qx_start { + my ($self) = @_; + if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) { + # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj) + $self->{async} = $async->($self->{rpipe}, + \&psgi_qx_init_cb, $self, $self); + # init_cb will call ->async_pass or ->close + } else { # generic PSGI + psgi_qx_init_cb($self) while $self->{qx_fh}; + } +} + # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with # the stdout of the given command when done; but respects the given limiter # $env is the PSGI env. As with ``/qx; only use this when output is small @@ -162,44 +174,8 @@ sub psgi_qx { $self->{qx_arg} = $qx_arg; $self->{qx_fh} = $qx_fh; $self->{qx_buf} = \$qx_buf; - my $async = $env->{'pi-httpd.async'}; - my $cb = sub { - my ($r, $buf); -reread: - $r = sysread($self->{rpipe}, $buf, 65536); - if ($async) { - $async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf); - } elsif (defined $r) { - $r ? $qx_fh->write($buf) : event_step($self, undef); - } else { - return if $! == EAGAIN; # try again when notified - goto reread if $! == EINTR; - event_step($self, $!); - } - }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); - $self->start($limiter, sub { # start_cb, may run later, much later... - if ($async) { - # PublicInbox::HTTPD::Async->new(rpipe, $cb, $end_obj) - $async = $async->($self->{rpipe}, $cb, undef, $self); - # $cb will call ->async_pass or ->close - } else { # generic PSGI - $cb->() while $self->{qx_fh}; - } - }); -} - -# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async -sub filter_fh ($$) { - my ($fh, $filter) = @_; - Plack::Util::inline_object( - close => sub { - $fh->write($filter->(undef)); - $fh->close; - }, - write => sub { - $fh->write($filter->($_[0])); - }); + start($self, $limiter, \&psgi_qx_start); } # this is called on pipe EOF to reap the process, may be called @@ -207,7 +183,7 @@ sub filter_fh ($$) { # PSGI servers. sub event_step { my ($self, $err) = @_; # $err: $! - log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err); + warn "psgi_{return,qx} $err" if defined($err); finish($self); my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)}); $fh->close if $fh; # async-only (psgi_return) @@ -216,22 +192,26 @@ sub event_step { sub rd_hdr ($) { my ($self) = @_; # typically used for reading CGI headers - # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm # We also need to check EINTR for generic PSGI servers. my $ret; my $total_rd = 0; my $hdr_buf = $self->{hdr_buf}; + my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}}; do { my $r = sysread($self->{rpipe}, $$hdr_buf, 4096, length($$hdr_buf)); if (defined($r)) { $total_rd += $r; - $ret = $self->{parse_hdr}->($total_rd, $hdr_buf); + eval { $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg) }; + if ($@) { + warn "parse_hdr: $@"; + $ret = [ 500, [], [ "Internal error\n" ] ]; + } } else { # caller should notify us when it's ready: return if $! == EAGAIN; next if $! == EINTR; # immediate retry - log_err($self->{psgi_env}, "error reading header: $!"); + warn "error reading header: $!"; $ret = [ 500, [], [ "Internal error\n" ] ]; } } until (defined $ret); @@ -239,6 +219,51 @@ sub rd_hdr ($) { $ret; } +sub psgi_return_init_cb { + my ($self) = @_; + my $r = rd_hdr($self) or return; + my $env = $self->{psgi_env}; + my $filter = delete $env->{'qspawn.filter'} // + PublicInbox::GzipFilter::qsp_maybe($r->[1], $env); + + my $wcb = delete $env->{'qspawn.wcb'}; + my $async = delete $self->{async}; + if (scalar(@$r) == 3) { # error + if ($async) { + # calls rpipe->close && ->event_step + $async->close; + } else { + $self->{rpipe}->close; + event_step($self); + } + $wcb->($r); + } elsif ($async) { + # done reading headers, handoff to read body + my $fh = $wcb->($r); # scalar @$r == 2 + $fh = $filter->attach($fh) if $filter; + $self->{fh} = $fh; + $async->async_pass($env->{'psgix.io'}, $fh, + delete($self->{hdr_buf})); + } else { # for synchronous PSGI servers + require PublicInbox::GetlineBody; + $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, + \&event_step, $self, + ${$self->{hdr_buf}}, $filter); + $wcb->($r); + } +} + +sub psgi_return_start { # may run later, much later... + my ($self) = @_; + if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) { + # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj) + $self->{async} = $async->($self->{rpipe}, + \&psgi_return_init_cb, $self, $self); + } else { # generic PSGI + psgi_return_init_cb($self) while $self->{parse_hdr}; + } +} + # Used for streaming the stdout of one process as a PSGI response. # # $env is the PSGI env. @@ -249,8 +274,9 @@ sub rd_hdr ($) { # psgi_return will return an anonymous # sub for the PSGI server to call # -# $env->{'qspawn.filter'} - filter callback, receives a string as input, -# undef on EOF +# $env->{'qspawn.filter'} - filter object, responds to ->attach for +# pi-httpd.async and ->translate for generic +# PSGI servers # # $limiter - the Limiter object to use (uses the def_limiter if not given) # @@ -262,68 +288,25 @@ sub rd_hdr ($) { # psgix.io. 3-element arrays means the body is available # immediately (or streamed via ->getline (pull-based)). sub psgi_return { - my ($self, $env, $limiter, $parse_hdr) = @_; + my ($self, $env, $limiter, $parse_hdr, $hdr_arg) = @_; $self->{psgi_env} = $env; $self->{hdr_buf} = \(my $hdr_buf = ''); - $self->{parse_hdr} = $parse_hdr; - my $wcb = delete $env->{'qspawn.wcb'}; # or PSGI server supplies it - my $async = $env->{'pi-httpd.async'}; - - my $cb = sub { - my $r = rd_hdr($self) or return; - my $filter = delete $env->{'qspawn.filter'}; - if (scalar(@$r) == 3) { # error - if ($async) { - # calls rpipe->close && ->event_step - $async->close; - } else { - $self->{rpipe}->close; - event_step($self); - } - $wcb->($r); - } elsif ($async) { - # done reading headers, handoff to read body - my $fh = $wcb->($r); # scalar @$r == 2 - $fh = filter_fh($fh, $filter) if $filter; - $self->{fh} = $fh; - $async->async_pass($env->{'psgix.io'}, $fh, - delete($self->{hdr_buf})); - } else { # for synchronous PSGI servers - require PublicInbox::GetlineBody; - $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, - \&event_step, $self, - ${$self->{hdr_buf}}, $filter); - $wcb->($r); - } - - # Workaround a leak under Perl 5.16.3 when combined with - # Plack::Middleware::Deflater: - $wcb = undef; - }; + $self->{parse_hdr} = [ $parse_hdr, $hdr_arg ]; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); - my $start_cb = sub { # may run later, much later... - if ($async) { - # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, - # $end_obj) - $async = $async->($self->{rpipe}, $cb, undef, $self); - # $cb will call ->async_pass or ->close - } else { # generic PSGI - $cb->() while $self->{parse_hdr}; - } - }; # the caller already captured the PSGI write callback from # the PSGI server, so we can call ->start, here: - return $self->start($limiter, $start_cb) if $wcb; + $env->{'qspawn.wcb'} and + return start($self, $limiter, \&psgi_return_start); # the caller will return this sub to the PSGI server, so - # it can set the response callback (that is, for PublicInbox::HTTP, - # the chunked_wcb or identity_wcb callback), but other HTTP servers - # are supported: + # it can set the response callback (that is, for + # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback), + # but other HTTP servers are supported: sub { - ($wcb) = @_; - $self->start($limiter, $start_cb); - }; + $env->{'qspawn.wcb'} = $_[0]; + start($self, $limiter, \&psgi_return_start); + } } package PublicInbox::Qspawn::Limiter; @@ -344,12 +327,12 @@ sub new { } sub setup_rlimit { - my ($self, $name, $config) = @_; - foreach my $rlim (PublicInbox::Spawn::RLIMITS()) { + my ($self, $name, $cfg) = @_; + foreach my $rlim (@PublicInbox::Spawn::RLIMITS) { my $k = lc($rlim); $k =~ tr/_//d; $k = "publicinboxlimiter.$name.$k"; - defined(my $v = $config->{$k}) or next; + defined(my $v = $cfg->{$k}) or next; my @rlimit = split(/\s*,\s*/, $v); if (scalar(@rlimit) == 1) { push @rlimit, $rlimit[0];