X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FQspawn.pm;h=9aede103521b47d3684032ca29ac6ace7f0a1747;hb=cd50d183273c105a7f08b1875ba6f7a51d9f8e9a;hp=9299096a40d9d36b07e488b626ed0e1ecf3e1fed;hpb=b04e5cf1bc8969cca74ef764f2de960b1ea821a4;p=public-inbox.git diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 9299096a..9aede103 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -1,13 +1,40 @@ -# Copyright (C) 2016 all contributors +# Copyright (C) 2016-2019 all contributors # License: AGPL-3.0+ + +# Like most Perl modules in public-inbox, this is internal and +# NOT subject to any stability guarantees! It is only documented +# for other hackers. +# +# This is used to limit the number of processes spawned by the +# PSGI server, so it acts like a semaphore and queues up extra +# commands to be run if currently at the limit. Multiple "limiters" +# may be configured which give inboxes different channels to +# operate in. This can be useful to ensure smaller inboxes can +# be cloned while cloning of large inboxes is maxed out. +# +# This does not depend on PublicInbox::DS or any other external +# scheduling mechanism, you just need to call start() and finish() +# appropriately. However, public-inbox-httpd (which uses PublicInbox::DS) +# will be able to schedule this based on readability of stdout from +# the spawned process. See GitHTTPBackend.pm and SolverGit.pm for +# usage examples. It does not depend on any form of threading. +# +# This is useful for scheduling CGI execution of both long-lived +# git-http-backend(1) process (for "git clone") as well as short-lived +# processes such as git-apply(1). + package PublicInbox::Qspawn; use strict; use warnings; use PublicInbox::Spawn qw(popen_rd); -our $LIMIT = 1; -my $running = 0; -my @run_queue; +require Plack::Util; + +my $def_limiter; +# declares a command to spawn (but does not spawn it). +# $cmd is the command to spawn +# $env is the environ for the child process +# $opt can include redirects and perhaps other process spawning options sub new ($$$;) { my ($class, $cmd, $env, $opt) = @_; bless { args => [ $cmd, $env, $opt ] }, $class; @@ -16,37 +43,269 @@ sub new ($$$;) { sub _do_spawn { my ($self, $cb) = @_; my $err; - ($self->{rpipe}, $self->{pid}) = popen_rd(@{$self->{args}}); + my ($cmd, $env, $opts) = @{$self->{args}}; + my %opts = %{$opts || {}}; + my $limiter = $self->{limiter}; + foreach my $k (PublicInbox::Spawn::RLIMITS()) { + if (defined(my $rlimit = $limiter->{$k})) { + $opts{$k} = $rlimit; + } + } + + ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $env, \%opts); if (defined $self->{pid}) { - $running++; + $limiter->{running}++; } else { $self->{err} = $!; } $cb->($self->{rpipe}); } +sub child_err ($) { + my ($child_error) = @_; # typically $? + my $exitstatus = ($child_error >> 8) or return; + my $sig = $child_error & 127; + my $msg = "exit status=$exitstatus"; + $msg .= " signal=$sig" if $sig; + $msg; +} + sub finish ($) { my ($self) = @_; + my $limiter = $self->{limiter}; + my $running; if (delete $self->{rpipe}) { my $pid = delete $self->{pid}; - $self->{err} = $pid == waitpid($pid, 0) ? $? : + $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) : "PID:$pid still running?"; - $running--; + $running = --$limiter->{running}; } - if (my $next = shift @run_queue) { - _do_spawn(@$next); + + # limiter->{max} may change dynamically + if (($running || $limiter->{running}) < $limiter->{max}) { + if (my $next = shift @{$limiter->{run_queue}}) { + _do_spawn(@$next); + } } $self->{err}; } -sub start ($$) { - my ($self, $cb) = @_; +sub start { + my ($self, $limiter, $cb) = @_; + $self->{limiter} = $limiter; - if ($running < $LIMIT) { + if ($limiter->{running} < $limiter->{max}) { _do_spawn($self, $cb); } else { - push @run_queue, [ $self, $cb ]; + push @{$limiter->{run_queue}}, [ $self, $cb ]; + } +} + +sub _psgi_finish ($$) { + my ($self, $env) = @_; + my $err = $self->finish; + if ($err && !$env->{'qspawn.quiet'}) { + $err = join(' ', @{$self->{args}->[0]}).": $err\n"; + $env->{'psgi.errors'}->print($err); + } +} + +# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with +# the stdout of the given command when done; but respects the given limiter +# $env is the PSGI env. As with ``/qx; only use this when output is small +# and safe to slurp. +sub psgi_qx { + my ($self, $env, $limiter, $qx_cb) = @_; + my $qx = PublicInbox::Qspawn::Qx->new; + my $end = sub { + _psgi_finish($self, $env); + eval { $qx_cb->($qx) }; + $qx = undef; + }; + my $rpipe; + my $async = $env->{'pi-httpd.async'}; + my $cb = sub { + my $r = sysread($rpipe, my $buf, 8192); + if ($async) { + $async->async_pass($env->{'psgix.io'}, $qx, \$buf); + } elsif (defined $r) { + $r ? $qx->write($buf) : $end->(); + } else { + return if $!{EAGAIN} || $!{EINTR}; # loop again + $end->(); + } + }; + $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + $self->start($limiter, sub { # may run later, much later... + ($rpipe) = @_; + if ($async) { + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $qx; + } + }); +} + +# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async +sub filter_fh ($$) { + my ($fh, $filter) = @_; + Plack::Util::inline_object( + close => sub { + $fh->write($filter->(undef)); + $fh->close; + }, + write => sub { + $fh->write($filter->($_[0])); + }); +} + +# Used for streaming the stdout of one process as a PSGI response. +# +# $env is the PSGI env. +# optional keys in $env: +# $env->{'qspawn.wcb'} - the write callback from the PSGI server +# optional, use this if you've already +# captured it elsewhere. If not given, +# psgi_return will return an anonymous +# sub for the PSGI server to call +# +# $env->{'qspawn.filter'} - filter callback, receives a string as input, +# undef on EOF +# +# $limiter - the Limiter object to use (uses the def_limiter if not given) +# +# $parse_hdr - Initial read function; often for parsing CGI header output. +# It will be given the return value of sysread from the pipe +# and a string ref of the current buffer. Returns an arrayref +# for PSGI responses. 2-element arrays in PSGI mean the +# body will be streamed, later, via writes (push-based) to +# psgix.io. 3-element arrays means the body is available +# immediately (or streamed via ->getline (pull-based)). +sub psgi_return { + my ($self, $env, $limiter, $parse_hdr) = @_; + my ($fh, $rpipe); + my $end = sub { + _psgi_finish($self, $env); + $fh->close if $fh; # async-only + }; + + my $buf = ''; + my $rd_hdr = sub { + my $r = sysread($rpipe, $buf, 1024, length($buf)); + return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); + $parse_hdr->($r, \$buf); + }; + + my $wcb = delete $env->{'qspawn.wcb'}; + my $async = $env->{'pi-httpd.async'}; + + my $cb = sub { + my $r = $rd_hdr->() or return; + $rd_hdr = undef; + my $filter = delete $env->{'qspawn.filter'}; + if (scalar(@$r) == 3) { # error + if ($async) { + $async->close; # calls rpipe->close and $end + } else { + $rpipe->close; + $end->(); + } + $wcb->($r); + } elsif ($async) { + $fh = $wcb->($r); # scalar @$r == 2 + $fh = filter_fh($fh, $filter) if $filter; + $async->async_pass($env->{'psgix.io'}, $fh, \$buf); + } else { # for synchronous PSGI servers + require PublicInbox::GetlineBody; + $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, + $buf, $filter); + $wcb->($r); + } + }; + $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + my $start_cb = sub { # may run later, much later... + ($rpipe) = @_; + if ($async) { + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $rd_hdr; + } + }; + + # the caller already captured the PSGI write callback from + # the PSGI server, so we can call ->start, here: + return $self->start($limiter, $start_cb) if $wcb; + + # the caller will return this sub to the PSGI server, so + # it can set the response callback (that is, for PublicInbox::HTTP, + # the chunked_wcb or identity_wcb callback), but other HTTP servers + # are supported: + sub { + ($wcb) = @_; + $self->start($limiter, $start_cb); + }; +} + +package PublicInbox::Qspawn::Limiter; +use strict; +use warnings; + +sub new { + my ($class, $max) = @_; + bless { + # 32 is same as the git-daemon connection limit + max => $max || 32, + running => 0, + run_queue => [], + # RLIMIT_CPU => undef, + # RLIMIT_DATA => undef, + # RLIMIT_CORE => undef, + }, $class; +} + +sub setup_rlimit { + my ($self, $name, $config) = @_; + foreach my $rlim (PublicInbox::Spawn::RLIMITS()) { + my $k = lc($rlim); + $k =~ tr/_//d; + $k = "publicinboxlimiter.$name.$k"; + defined(my $v = $config->{$k}) or next; + my @rlimit = split(/\s*,\s*/, $v); + if (scalar(@rlimit) == 1) { + push @rlimit, $rlimit[0]; + } elsif (scalar(@rlimit) != 2) { + warn "could not parse $k: $v\n"; + } + eval { require BSD::Resource }; + if ($@) { + warn "BSD::Resource missing for $rlim"; + next; + } + foreach my $i (0..$#rlimit) { + next if $rlimit[$i] ne 'INFINITY'; + $rlimit[$i] = BSD::Resource::RLIM_INFINITY(); + } + $self->{$rlim} = \@rlimit; } } +# captures everything into a buffer and executes a callback when done +package PublicInbox::Qspawn::Qx; +use strict; +use warnings; + +sub new { + my ($class) = @_; + my $buf = ''; + bless \$buf, $class; +} + +# called by PublicInbox::HTTPD::Async ($fh->write) +sub write { + ${$_[0]} .= $_[1]; + undef; +} + 1;