1 # Copyright (C) 2016-2019 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
4 # Like most Perl modules in public-inbox, this is internal and
5 # NOT subject to any stability guarantees! It is only documented
8 # This is used to limit the number of processes spawned by the
9 # PSGI server, so it acts like a semaphore and queues up extra
10 # commands to be run if currently at the limit. Multiple "limiters"
11 # may be configured which give inboxes different channels to
12 # operate in. This can be useful to ensure smaller inboxes can
13 # be cloned while cloning of large inboxes is maxed out.
15 # This does not depend on Danga::Socket or any other external
16 # scheduling mechanism, you just need to call start() and finish()
17 # appropriately. However, public-inbox-httpd (which uses Danga::Socket)
18 # will be able to schedule this based on readability of stdout from
19 # the spawned process. See GitHTTPBackend.pm and SolverGit.pm for
20 # usage examples. It does not depend on any form of threading.
22 # This is useful for scheduling CGI execution of both long-lived
23 # git-http-backend(1) process (for "git clone") as well as short-lived
24 # processes such as git-apply(1).
26 package PublicInbox::Qspawn;
29 use PublicInbox::Spawn qw(popen_rd);
34 # declares a command to spawn (but does not spawn it).
35 # $cmd is the command to spawn
36 # $env is the environ for the child process
37 # $opt can include redirects and perhaps other process spawning options
39 my ($class, $cmd, $env, $opt) = @_;
40 bless { args => [ $cmd, $env, $opt ] }, $class;
47 ($self->{rpipe}, $self->{pid}) = popen_rd(@{$self->{args}});
48 if (defined $self->{pid}) {
49 $self->{limiter}->{running}++;
53 $cb->($self->{rpipe});
57 my ($child_error) = @_; # typically $?
58 my $exitstatus = ($child_error >> 8) or return;
59 my $sig = $child_error & 127;
60 my $msg = "exit status=$exitstatus";
61 $msg .= " signal=$sig" if $sig;
67 my $limiter = $self->{limiter};
69 if (delete $self->{rpipe}) {
70 my $pid = delete $self->{pid};
71 $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) :
72 "PID:$pid still running?";
73 $running = --$limiter->{running};
76 # limiter->{max} may change dynamically
77 if (($running || $limiter->{running}) < $limiter->{max}) {
78 if (my $next = shift @{$limiter->{run_queue}}) {
86 my ($self, $limiter, $cb) = @_;
87 $self->{limiter} = $limiter;
89 if ($limiter->{running} < $limiter->{max}) {
90 _do_spawn($self, $cb);
92 push @{$limiter->{run_queue}}, [ $self, $cb ];
96 sub _psgi_finish ($$) {
97 my ($self, $env) = @_;
98 my $err = $self->finish;
99 if ($err && !$env->{'qspawn.quiet'}) {
100 $err = join(' ', @{$self->{args}->[0]}).": $err\n";
101 $env->{'psgi.errors'}->print($err);
105 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
106 # the stdout of the given command when done; but respects the given limiter
107 # $env is the PSGI env. As with ``/qx; only use this when output is small
110 my ($self, $env, $limiter, $qx_cb) = @_;
111 my $qx = PublicInbox::Qspawn::Qx->new;
113 _psgi_finish($self, $env);
114 eval { $qx_cb->($qx) };
118 my $async = $env->{'pi-httpd.async'};
120 my $r = sysread($rpipe, my $buf, 8192);
122 $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
123 } elsif (defined $r) {
124 $r ? $qx->write($buf) : $end->();
126 return if $!{EAGAIN} || $!{EINTR}; # loop again
130 $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
131 $self->start($limiter, sub { # may run later, much later...
134 # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
135 $async = $async->($rpipe, $cb, $end);
136 } else { # generic PSGI
142 # create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
144 my ($fh, $filter) = @_;
145 Plack::Util::inline_object(
147 $fh->write($filter->(undef));
151 $fh->write($filter->($_[0]));
155 # Used for streaming the stdout of one process as a PSGI response.
157 # $env is the PSGI env.
158 # optional keys in $env:
159 # $env->{'qspawn.wcb'} - the write callback from the PSGI server
160 # optional, use this if you've already
161 # captured it elsewhere. If not given,
162 # psgi_return will return an anonymous
163 # sub for the PSGI server to call
165 # $env->{'qspawn.filter'} - filter callback, receives a string as input,
168 # $limiter - the Limiter object to use (uses the def_limiter if not given)
170 # $parse_hdr - Initial read function; often for parsing CGI header output.
171 # It will be given the return value of sysread from the pipe
172 # and a string ref of the current buffer. Returns an arrayref
173 # for PSGI responses. 2-element arrays in PSGI mean the
174 # body will be streamed, later, via writes (push-based) to
175 # psgix.io. 3-element arrays means the body is available
176 # immediately (or streamed via ->getline (pull-based)).
178 my ($self, $env, $limiter, $parse_hdr) = @_;
181 _psgi_finish($self, $env);
182 $fh->close if $fh; # async-only
187 my $r = sysread($rpipe, $buf, 1024, length($buf));
188 return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
189 $parse_hdr->($r, \$buf);
192 my $wcb = delete $env->{'qspawn.wcb'};
193 my $async = $env->{'pi-httpd.async'};
196 my $r = $rd_hdr->() or return;
198 my $filter = delete $env->{'qspawn.filter'};
199 if (scalar(@$r) == 3) { # error
201 $async->close; # calls rpipe->close and $end
208 $fh = $wcb->($r); # scalar @$r == 2
209 $fh = filter_fh($fh, $filter) if $filter;
210 $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
211 } else { # for synchronous PSGI servers
212 require PublicInbox::GetlineBody;
213 $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
218 $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
219 my $start_cb = sub { # may run later, much later...
222 # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
223 $async = $async->($rpipe, $cb, $end);
224 } else { # generic PSGI
225 $cb->() while $rd_hdr;
229 # the caller already captured the PSGI write callback from
230 # the PSGI server, so we can call ->start, here:
231 return $self->start($limiter, $start_cb) if $wcb;
233 # the caller will return this sub to the PSGI server, so
234 # it can set the response callback (that is, for PublicInbox::HTTP,
235 # the chunked_wcb or identity_wcb callback), but other HTTP servers
239 $self->start($limiter, $start_cb);
243 package PublicInbox::Qspawn::Limiter;
248 my ($class, $max) = @_;
250 # 32 is same as the git-daemon connection limit
257 # captures everything into a buffer and executes a callback when done
258 package PublicInbox::Qspawn::Qx;
268 # called by PublicInbox::HTTPD::Async ($fh->write)