X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FQspawn.pm;h=651fa39096d7144beb7d9bb6030fcb67c9873df0;hb=4d44a2ca68ba1b88ee5245df7b74492f9e41db6d;hp=913fac817c9046d61651cf3846ab9e44fe652760;hpb=cee59522e00810f508aabce5a46e63bddcc203cc;p=public-inbox.git diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 913fac81..651fa390 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -1,33 +1,72 @@ -# Copyright (C) 2016-2018 all contributors +# Copyright (C) 2016-2019 all contributors # License: AGPL-3.0+ -# Limits the number of processes spawned -# This does not depend on Danga::Socket or any other external -# scheduling mechanism, you just need to call start and finish -# appropriately +# Like most Perl modules in public-inbox, this is internal and +# NOT subject to any stability guarantees! It is only documented +# for other hackers. +# +# This is used to limit the number of processes spawned by the +# PSGI server, so it acts like a semaphore and queues up extra +# commands to be run if currently at the limit. Multiple "limiters" +# may be configured which give inboxes different channels to +# operate in. This can be useful to ensure smaller inboxes can +# be cloned while cloning of large inboxes is maxed out. +# +# This does not depend on PublicInbox::DS or any other external +# scheduling mechanism, you just need to call start() and finish() +# appropriately. However, public-inbox-httpd (which uses PublicInbox::DS) +# will be able to schedule this based on readability of stdout from +# the spawned process. See GitHTTPBackend.pm and SolverGit.pm for +# usage examples. It does not depend on any form of threading. +# +# This is useful for scheduling CGI execution of both long-lived +# git-http-backend(1) process (for "git clone") as well as short-lived +# processes such as git-apply(1). + package PublicInbox::Qspawn; use strict; use warnings; use PublicInbox::Spawn qw(popen_rd); require Plack::Util; + +# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers +use Errno qw(EAGAIN EINTR); + my $def_limiter; +# declares a command to spawn (but does not spawn it). +# $cmd is the command to spawn +# $env is the environ for the child process +# $opt can include redirects and perhaps other process spawning options sub new ($$$;) { my ($class, $cmd, $env, $opt) = @_; bless { args => [ $cmd, $env, $opt ] }, $class; } sub _do_spawn { - my ($self, $cb) = @_; + my ($self, $start_cb, $limiter) = @_; my $err; + my ($cmd, $env, $opts) = @{$self->{args}}; + my %opts = %{$opts || {}}; + $self->{limiter} = $limiter; + foreach my $k (PublicInbox::Spawn::RLIMITS()) { + if (defined(my $rlimit = $limiter->{$k})) { + $opts{$k} = $rlimit; + } + } + + ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $env, \%opts); + + # drop any IO handles opt was holding open via $opt->{hold} + # No need to hold onto the descriptor once the child process has it. + $self->{args} = $cmd; # keep this around for logging - ($self->{rpipe}, $self->{pid}) = popen_rd(@{$self->{args}}); if (defined $self->{pid}) { - $self->{limiter}->{running}++; + $limiter->{running}++; } else { $self->{err} = $!; } - $cb->($self->{rpipe}); + $start_cb->($self->{rpipe}); } sub child_err ($) { @@ -39,73 +78,115 @@ sub child_err ($) { $msg; } -sub finish ($) { - my ($self) = @_; +sub log_err ($$) { + my ($env, $msg) = @_; + $env->{'psgi.errors'}->print($msg, "\n"); +} + +# callback for dwaitpid +sub waitpid_err ($$) { + my ($self, $pid) = @_; + my $xpid = delete $self->{pid}; + my $err; + if ($pid > 0) { # success! + $err = child_err($?); + } elsif ($pid < 0) { # ??? does this happen in our case? + $err = "W: waitpid($xpid, 0) => $pid: $!"; + } # else should not be called with pid == 0 + + my $env = delete $self->{env}; + + # done, spawn whatever's in the queue my $limiter = $self->{limiter}; - my $running; - if (delete $self->{rpipe}) { - my $pid = delete $self->{pid}; - $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) : - "PID:$pid still running?"; - $running = --$limiter->{running}; + my $running = --$limiter->{running}; + + if ($running < $limiter->{max}) { + if (my $next = shift(@{$limiter->{run_queue}})) { + _do_spawn(@$next, $limiter); + } } - # limiter->{max} may change dynamically - if (($running || $limiter->{running}) < $limiter->{max}) { - if (my $next = shift @{$limiter->{run_queue}}) { - _do_spawn(@$next); + if ($err) { + $self->{err} = $err; + if ($env && !$env->{'qspawn.quiet'}) { + log_err($env, join(' ', @{$self->{args}}) . ": $err"); } } - $self->{err}; + if (my $fin_cb = delete $self->{fin_cb}) { + eval { $fin_cb->() } + } } -sub start { - my ($self, $limiter, $cb) = @_; - $self->{limiter} = $limiter; +sub do_waitpid ($;$$) { + my ($self, $env, $fin_cb) = @_; + my $pid = $self->{pid}; + $self->{env} = $env; + $self->{fin_cb} = $fin_cb; + # PublicInbox::DS may not be loaded + eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) }; + # done if we're running in PublicInbox::DS::EventLoop + if ($@) { + # non public-inbox-{httpd,nntpd} callers may block: + my $ret = waitpid($pid, 0); + waitpid_err($self, $ret); + } +} - if ($limiter->{running} < $limiter->{max}) { - _do_spawn($self, $cb); - } else { - push @{$limiter->{run_queue}}, [ $self, $cb ]; +sub finish ($;$$) { + my ($self, $env, $fin_cb) = @_; + if (delete $self->{rpipe}) { + do_waitpid($self, $env, $fin_cb); + } elsif ($fin_cb) { + eval { $fin_cb->() }; } } -sub _psgi_finish ($$) { - my ($self, $env) = @_; - my $err = $self->finish; - if ($err && !$env->{'qspawn.quiet'}) { - $err = join(' ', @{$self->{args}->[0]}).": $err\n"; - $env->{'psgi.errors'}->print($err); +sub start { + my ($self, $limiter, $start_cb) = @_; + if ($limiter->{running} < $limiter->{max}) { + _do_spawn($self, $start_cb, $limiter); + } else { + push @{$limiter->{run_queue}}, [ $self, $start_cb ]; } } +# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with +# the stdout of the given command when done; but respects the given limiter +# $env is the PSGI env. As with ``/qx; only use this when output is small +# and safe to slurp. sub psgi_qx { my ($self, $env, $limiter, $qx_cb) = @_; - my $qx = PublicInbox::Qspawn::Qx->new; + my $scalar = ''; + open(my $qx, '+>', \$scalar) or die; # PerlIO::scalar my $end = sub { - _psgi_finish($self, $env); - eval { $qx_cb->($qx) }; + my $err = $_[0]; # $! + log_err($env, "psgi_qx: $err") if defined($err); + finish($self, $env, sub { $qx_cb->(\$scalar) }); $qx = undef; }; - my $rpipe; + my $rpipe; # comes from popen_rd my $async = $env->{'pi-httpd.async'}; my $cb = sub { - my $r = sysread($rpipe, my $buf, 8192); + my ($r, $buf); +reread: + $r = sysread($rpipe, $buf, 65536); if ($async) { $async->async_pass($env->{'psgix.io'}, $qx, \$buf); } elsif (defined $r) { $r ? $qx->write($buf) : $end->(); } else { - return if $!{EAGAIN} || $!{EINTR}; # loop again - $end->(); + return if $! == EAGAIN; # try again when notified + goto reread if $! == EINTR; + $end->($!); } }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); - $self->start($limiter, sub { # may run later, much later... - ($rpipe) = @_; + $self->start($limiter, sub { # start_cb, may run later, much later... + ($rpipe) = @_; # popen_rd result if ($async) { # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) $async = $async->($rpipe, $cb, $end); + # $cb will call ->async_pass or ->close } else { # generic PSGI $cb->() while $qx; } @@ -125,25 +206,67 @@ sub filter_fh ($$) { }); } +# Used for streaming the stdout of one process as a PSGI response. +# +# $env is the PSGI env. +# optional keys in $env: +# $env->{'qspawn.wcb'} - the write callback from the PSGI server +# optional, use this if you've already +# captured it elsewhere. If not given, +# psgi_return will return an anonymous +# sub for the PSGI server to call +# +# $env->{'qspawn.filter'} - filter callback, receives a string as input, +# undef on EOF +# +# $limiter - the Limiter object to use (uses the def_limiter if not given) +# +# $parse_hdr - Initial read function; often for parsing CGI header output. +# It will be given the return value of sysread from the pipe +# and a string ref of the current buffer. Returns an arrayref +# for PSGI responses. 2-element arrays in PSGI mean the +# body will be streamed, later, via writes (push-based) to +# psgix.io. 3-element arrays means the body is available +# immediately (or streamed via ->getline (pull-based)). sub psgi_return { my ($self, $env, $limiter, $parse_hdr) = @_; my ($fh, $rpipe); my $end = sub { - _psgi_finish($self, $env); + my $err = $_[0]; # $! + log_err($env, "psgi_return: $err") if defined($err); + finish($self, $env); $fh->close if $fh; # async-only }; my $buf = ''; my $rd_hdr = sub { - my $r = sysread($rpipe, $buf, 1024, length($buf)); - return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); - $parse_hdr->($r, \$buf); + # typically used for reading CGI headers + # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm + # We also need to check EINTR for generic PSGI servers. + my $ret; + my $total_rd = 0; + do { + my $r = sysread($rpipe, $buf, 4096, length($buf)); + if (defined($r)) { + $total_rd += $r; + $ret = $parse_hdr->($r ? $total_rd : 0, \$buf); + } else { + # caller should notify us when it's ready: + return if $! == EAGAIN; + next if $! == EINTR; # immediate retry + log_err($env, "error reading header: $!"); + $ret = [ 500, [], [ "Internal error\n" ] ]; + } + } until (defined $ret); + $ret; }; - my $res = delete $env->{'qspawn.response'}; + + my $wcb = delete $env->{'qspawn.wcb'}; # or PSGI server supplies it my $async = $env->{'pi-httpd.async'}; + my $cb = sub { my $r = $rd_hdr->() or return; - $rd_hdr = undef; + $rd_hdr = undef; # done reading headers my $filter = delete $env->{'qspawn.filter'}; if (scalar(@$r) == 3) { # error if ($async) { @@ -152,17 +275,22 @@ sub psgi_return { $rpipe->close; $end->(); } - $res->($r); + $wcb->($r); } elsif ($async) { - $fh = $res->($r); # scalar @$r == 2 + # done reading headers, handoff to read body + $fh = $wcb->($r); # scalar @$r == 2 $fh = filter_fh($fh, $filter) if $filter; $async->async_pass($env->{'psgix.io'}, $fh, \$buf); } else { # for synchronous PSGI servers require PublicInbox::GetlineBody; $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, $buf, $filter); - $res->($r); + $wcb->($r); } + + # Workaround a leak under Perl 5.16.3 when combined with + # Plack::Middleware::Deflater: + $wcb = undef; }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); my $start_cb = sub { # may run later, much later... @@ -170,15 +298,22 @@ sub psgi_return { if ($async) { # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) $async = $async->($rpipe, $cb, $end); + # $cb will call ->async_pass or ->close } else { # generic PSGI $cb->() while $rd_hdr; } }; - return $self->start($limiter, $start_cb) if $res; + # the caller already captured the PSGI write callback from + # the PSGI server, so we can call ->start, here: + return $self->start($limiter, $start_cb) if $wcb; + # the caller will return this sub to the PSGI server, so + # it can set the response callback (that is, for PublicInbox::HTTP, + # the chunked_wcb or identity_wcb callback), but other HTTP servers + # are supported: sub { - ($res) = @_; + ($wcb) = @_; $self->start($limiter, $start_cb); }; } @@ -194,24 +329,36 @@ sub new { max => $max || 32, running => 0, run_queue => [], + # RLIMIT_CPU => undef, + # RLIMIT_DATA => undef, + # RLIMIT_CORE => undef, }, $class; } -# captures everything into a buffer and executes a callback when done -package PublicInbox::Qspawn::Qx; -use strict; -use warnings; - -sub new { - my ($class) = @_; - my $buf = ''; - bless \$buf, $class; -} - -# called by PublicInbox::HTTPD::Async ($fh->write) -sub write { - ${$_[0]} .= $_[1]; - undef; +sub setup_rlimit { + my ($self, $name, $config) = @_; + foreach my $rlim (PublicInbox::Spawn::RLIMITS()) { + my $k = lc($rlim); + $k =~ tr/_//d; + $k = "publicinboxlimiter.$name.$k"; + defined(my $v = $config->{$k}) or next; + my @rlimit = split(/\s*,\s*/, $v); + if (scalar(@rlimit) == 1) { + push @rlimit, $rlimit[0]; + } elsif (scalar(@rlimit) != 2) { + warn "could not parse $k: $v\n"; + } + eval { require BSD::Resource }; + if ($@) { + warn "BSD::Resource missing for $rlim"; + next; + } + foreach my $i (0..$#rlimit) { + next if $rlimit[$i] ne 'INFINITY'; + $rlimit[$i] = BSD::Resource::RLIM_INFINITY(); + } + $self->{$rlim} = \@rlimit; + } } 1;