]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Qspawn.pm
qspawn: documentation updates
[public-inbox.git] / lib / PublicInbox / Qspawn.pm
1 # Copyright (C) 2016-2019 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # Like most Perl modules in public-inbox, this is internal and
5 # NOT subject to any stability guarantees!  It is only documented
6 # for other hackers.
7 #
8 # This is used to limit the number of processes spawned by the
9 # PSGI server, so it acts like a semaphore and queues up extra
10 # commands to be run if currently at the limit.  Multiple "limiters"
11 # may be configured which give inboxes different channels to
12 # operate in.  This can be useful to ensure smaller inboxes can
13 # be cloned while cloning of large inboxes is maxed out.
14 #
15 # This does not depend on Danga::Socket or any other external
16 # scheduling mechanism, you just need to call start() and finish()
17 # appropriately. However, public-inbox-httpd (which uses Danga::Socket)
18 # will be able to schedule this based on readability of stdout from
19 # the spawned process.  See GitHTTPBackend.pm and SolverGit.pm for
20 # usage examples.  It does not depend on any form of threading.
21 #
22 # This is useful for scheduling CGI execution of both long-lived
23 # git-http-backend(1) process (for "git clone") as well as short-lived
24 # processes such as git-apply(1).
25
26 package PublicInbox::Qspawn;
27 use strict;
28 use warnings;
29 use PublicInbox::Spawn qw(popen_rd);
30 require Plack::Util;
31
32 my $def_limiter;
33
34 # declares a command to spawn (but does not spawn it).
35 # $cmd is the command to spawn
36 # $env is the environ for the child process
37 # $opt can include redirects and perhaps other process spawning options
38 sub new ($$$;) {
39         my ($class, $cmd, $env, $opt) = @_;
40         bless { args => [ $cmd, $env, $opt ] }, $class;
41 }
42
43 sub _do_spawn {
44         my ($self, $cb) = @_;
45         my $err;
46
47         ($self->{rpipe}, $self->{pid}) = popen_rd(@{$self->{args}});
48         if (defined $self->{pid}) {
49                 $self->{limiter}->{running}++;
50         } else {
51                 $self->{err} = $!;
52         }
53         $cb->($self->{rpipe});
54 }
55
56 sub child_err ($) {
57         my ($child_error) = @_; # typically $?
58         my $exitstatus = ($child_error >> 8) or return;
59         my $sig = $child_error & 127;
60         my $msg = "exit status=$exitstatus";
61         $msg .= " signal=$sig" if $sig;
62         $msg;
63 }
64
65 sub finish ($) {
66         my ($self) = @_;
67         my $limiter = $self->{limiter};
68         my $running;
69         if (delete $self->{rpipe}) {
70                 my $pid = delete $self->{pid};
71                 $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) :
72                                 "PID:$pid still running?";
73                 $running = --$limiter->{running};
74         }
75
76         # limiter->{max} may change dynamically
77         if (($running || $limiter->{running}) < $limiter->{max}) {
78                 if (my $next = shift @{$limiter->{run_queue}}) {
79                         _do_spawn(@$next);
80                 }
81         }
82         $self->{err};
83 }
84
85 sub start {
86         my ($self, $limiter, $cb) = @_;
87         $self->{limiter} = $limiter;
88
89         if ($limiter->{running} < $limiter->{max}) {
90                 _do_spawn($self, $cb);
91         } else {
92                 push @{$limiter->{run_queue}}, [ $self, $cb ];
93         }
94 }
95
96 sub _psgi_finish ($$) {
97         my ($self, $env) = @_;
98         my $err = $self->finish;
99         if ($err && !$env->{'qspawn.quiet'}) {
100                 $err = join(' ', @{$self->{args}->[0]}).": $err\n";
101                 $env->{'psgi.errors'}->print($err);
102         }
103 }
104
105 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
106 # the stdout of the given command when done; but respects the given limiter
107 # $env is the PSGI env.  As with ``/qx; only use this when output is small
108 # and safe to slurp.
109 sub psgi_qx {
110         my ($self, $env, $limiter, $qx_cb) = @_;
111         my $qx = PublicInbox::Qspawn::Qx->new;
112         my $end = sub {
113                 _psgi_finish($self, $env);
114                 eval { $qx_cb->($qx) };
115                 $qx = undef;
116         };
117         my $rpipe;
118         my $async = $env->{'pi-httpd.async'};
119         my $cb = sub {
120                 my $r = sysread($rpipe, my $buf, 8192);
121                 if ($async) {
122                         $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
123                 } elsif (defined $r) {
124                         $r ? $qx->write($buf) : $end->();
125                 } else {
126                         return if $!{EAGAIN} || $!{EINTR}; # loop again
127                         $end->();
128                 }
129         };
130         $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
131         $self->start($limiter, sub { # may run later, much later...
132                 ($rpipe) = @_;
133                 if ($async) {
134                 # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
135                         $async = $async->($rpipe, $cb, $end);
136                 } else { # generic PSGI
137                         $cb->() while $qx;
138                 }
139         });
140 }
141
142 # create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
143 sub filter_fh ($$) {
144         my ($fh, $filter) = @_;
145         Plack::Util::inline_object(
146                 close => sub {
147                         $fh->write($filter->(undef));
148                         $fh->close;
149                 },
150                 write => sub {
151                         $fh->write($filter->($_[0]));
152                 });
153 }
154
155 # Used for streaming the stdout of one process as a PSGI response.
156 #
157 # $env is the PSGI env.
158 # optional keys in $env:
159 #   $env->{'qspawn.wcb'} - the write callback from the PSGI server
160 #                          optional, use this if you've already
161 #                          captured it elsewhere.  If not given,
162 #                          psgi_return will return an anonymous
163 #                          sub for the PSGI server to call
164 #
165 #   $env->{'qspawn.filter'} - filter callback, receives a string as input,
166 #                             undef on EOF
167 #
168 # $limiter - the Limiter object to use (uses the def_limiter if not given)
169 #
170 # $parse_hdr - Initial read function; often for parsing CGI header output.
171 #              It will be given the return value of sysread from the pipe
172 #              and a string ref of the current buffer.  Returns an arrayref
173 #              for PSGI responses.  2-element arrays in PSGI mean the
174 #              body will be streamed, later, via writes (push-based) to
175 #              psgix.io.  3-element arrays means the body is available
176 #              immediately (or streamed via ->getline (pull-based)).
177 sub psgi_return {
178         my ($self, $env, $limiter, $parse_hdr) = @_;
179         my ($fh, $rpipe);
180         my $end = sub {
181                 _psgi_finish($self, $env);
182                 $fh->close if $fh; # async-only
183         };
184
185         my $buf = '';
186         my $rd_hdr = sub {
187                 my $r = sysread($rpipe, $buf, 1024, length($buf));
188                 return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
189                 $parse_hdr->($r, \$buf);
190         };
191
192         my $wcb = delete $env->{'qspawn.wcb'};
193         my $async = $env->{'pi-httpd.async'};
194
195         my $cb = sub {
196                 my $r = $rd_hdr->() or return;
197                 $rd_hdr = undef;
198                 my $filter = delete $env->{'qspawn.filter'};
199                 if (scalar(@$r) == 3) { # error
200                         if ($async) {
201                                 $async->close; # calls rpipe->close and $end
202                         } else {
203                                 $rpipe->close;
204                                 $end->();
205                         }
206                         $wcb->($r);
207                 } elsif ($async) {
208                         $fh = $wcb->($r); # scalar @$r == 2
209                         $fh = filter_fh($fh, $filter) if $filter;
210                         $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
211                 } else { # for synchronous PSGI servers
212                         require PublicInbox::GetlineBody;
213                         $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
214                                                                 $buf, $filter);
215                         $wcb->($r);
216                 }
217         };
218         $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
219         my $start_cb = sub { # may run later, much later...
220                 ($rpipe) = @_;
221                 if ($async) {
222                         # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
223                         $async = $async->($rpipe, $cb, $end);
224                 } else { # generic PSGI
225                         $cb->() while $rd_hdr;
226                 }
227         };
228
229         # the caller already captured the PSGI write callback from
230         # the PSGI server, so we can call ->start, here:
231         return $self->start($limiter, $start_cb) if $wcb;
232
233         # the caller will return this sub to the PSGI server, so
234         # it can set the response callback (that is, for PublicInbox::HTTP,
235         # the chunked_wcb or identity_wcb callback), but other HTTP servers
236         # are supported:
237         sub {
238                 ($wcb) = @_;
239                 $self->start($limiter, $start_cb);
240         };
241 }
242
243 package PublicInbox::Qspawn::Limiter;
244 use strict;
245 use warnings;
246
247 sub new {
248         my ($class, $max) = @_;
249         bless {
250                 # 32 is same as the git-daemon connection limit
251                 max => $max || 32,
252                 running => 0,
253                 run_queue => [],
254         }, $class;
255 }
256
257 # captures everything into a buffer and executes a callback when done
258 package PublicInbox::Qspawn::Qx;
259 use strict;
260 use warnings;
261
262 sub new {
263         my ($class) = @_;
264         my $buf = '';
265         bless \$buf, $class;
266 }
267
268 # called by PublicInbox::HTTPD::Async ($fh->write)
269 sub write {
270         ${$_[0]} .= $_[1];
271         undef;
272 }
273
274 1;