]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Qspawn.pm
ds|nntp: use CORE::close on socket
[public-inbox.git] / lib / PublicInbox / Qspawn.pm
1 # Copyright (C) 2016-2019 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # Like most Perl modules in public-inbox, this is internal and
5 # NOT subject to any stability guarantees!  It is only documented
6 # for other hackers.
7 #
8 # This is used to limit the number of processes spawned by the
9 # PSGI server, so it acts like a semaphore and queues up extra
10 # commands to be run if currently at the limit.  Multiple "limiters"
11 # may be configured which give inboxes different channels to
12 # operate in.  This can be useful to ensure smaller inboxes can
13 # be cloned while cloning of large inboxes is maxed out.
14 #
15 # This does not depend on PublicInbox::DS or any other external
16 # scheduling mechanism, you just need to call start() and finish()
17 # appropriately. However, public-inbox-httpd (which uses PublicInbox::DS)
18 # will be able to schedule this based on readability of stdout from
19 # the spawned process.  See GitHTTPBackend.pm and SolverGit.pm for
20 # usage examples.  It does not depend on any form of threading.
21 #
22 # This is useful for scheduling CGI execution of both long-lived
23 # git-http-backend(1) process (for "git clone") as well as short-lived
24 # processes such as git-apply(1).
25
26 package PublicInbox::Qspawn;
27 use strict;
28 use warnings;
29 use PublicInbox::Spawn qw(popen_rd);
30 require Plack::Util;
31
32 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
33 use Errno qw(EAGAIN EINTR);
34
35 my $def_limiter;
36
37 # declares a command to spawn (but does not spawn it).
38 # $cmd is the command to spawn
39 # $env is the environ for the child process
40 # $opt can include redirects and perhaps other process spawning options
41 sub new ($$$;) {
42         my ($class, $cmd, $env, $opt) = @_;
43         bless { args => [ $cmd, $env, $opt ] }, $class;
44 }
45
46 sub _do_spawn {
47         my ($self, $cb) = @_;
48         my $err;
49         my ($cmd, $env, $opts) = @{$self->{args}};
50         my %opts = %{$opts || {}};
51         my $limiter = $self->{limiter};
52         foreach my $k (PublicInbox::Spawn::RLIMITS()) {
53                 if (defined(my $rlimit = $limiter->{$k})) {
54                         $opts{$k} = $rlimit;
55                 }
56         }
57
58         ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $env, \%opts);
59         if (defined $self->{pid}) {
60                 $limiter->{running}++;
61         } else {
62                 $self->{err} = $!;
63         }
64         $cb->($self->{rpipe});
65 }
66
67 sub child_err ($) {
68         my ($child_error) = @_; # typically $?
69         my $exitstatus = ($child_error >> 8) or return;
70         my $sig = $child_error & 127;
71         my $msg = "exit status=$exitstatus";
72         $msg .= " signal=$sig" if $sig;
73         $msg;
74 }
75
76 sub finish ($) {
77         my ($self) = @_;
78         my $limiter = $self->{limiter};
79         my $running;
80         if (delete $self->{rpipe}) {
81                 my $pid = delete $self->{pid};
82                 $self->{err} = $pid == waitpid($pid, 0) ? child_err($?) :
83                                 "PID:$pid still running?";
84                 $running = --$limiter->{running};
85         }
86
87         # limiter->{max} may change dynamically
88         if (($running || $limiter->{running}) < $limiter->{max}) {
89                 if (my $next = shift @{$limiter->{run_queue}}) {
90                         _do_spawn(@$next);
91                 }
92         }
93         $self->{err};
94 }
95
96 sub start {
97         my ($self, $limiter, $cb) = @_;
98         $self->{limiter} = $limiter;
99
100         if ($limiter->{running} < $limiter->{max}) {
101                 _do_spawn($self, $cb);
102         } else {
103                 push @{$limiter->{run_queue}}, [ $self, $cb ];
104         }
105 }
106
107 sub _psgi_finish ($$) {
108         my ($self, $env) = @_;
109         my $err = $self->finish;
110         if ($err && !$env->{'qspawn.quiet'}) {
111                 $err = join(' ', @{$self->{args}->[0]}).": $err\n";
112                 $env->{'psgi.errors'}->print($err);
113         }
114 }
115
116 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
117 # the stdout of the given command when done; but respects the given limiter
118 # $env is the PSGI env.  As with ``/qx; only use this when output is small
119 # and safe to slurp.
120 sub psgi_qx {
121         my ($self, $env, $limiter, $qx_cb) = @_;
122         my $qx = PublicInbox::Qspawn::Qx->new;
123         my $end = sub {
124                 _psgi_finish($self, $env);
125                 eval { $qx_cb->($qx) };
126                 $qx = undef;
127         };
128         my $rpipe; # comes from popen_rd
129         my $async = $env->{'pi-httpd.async'};
130         my $cb = sub {
131                 my $r = sysread($rpipe, my $buf, 8192);
132                 if ($async) {
133                         $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
134                 } elsif (defined $r) {
135                         $r ? $qx->write($buf) : $end->();
136                 } else {
137                         return if $! == EAGAIN || $! == EINTR; # loop again
138                         $end->();
139                 }
140         };
141         $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
142         $self->start($limiter, sub { # may run later, much later...
143                 ($rpipe) = @_; # popen_rd result
144                 if ($async) {
145                 # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
146                         $async = $async->($rpipe, $cb, $end);
147                 } else { # generic PSGI
148                         $cb->() while $qx;
149                 }
150         });
151 }
152
153 # create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
154 sub filter_fh ($$) {
155         my ($fh, $filter) = @_;
156         Plack::Util::inline_object(
157                 close => sub {
158                         $fh->write($filter->(undef));
159                         $fh->close;
160                 },
161                 write => sub {
162                         $fh->write($filter->($_[0]));
163                 });
164 }
165
166 # Used for streaming the stdout of one process as a PSGI response.
167 #
168 # $env is the PSGI env.
169 # optional keys in $env:
170 #   $env->{'qspawn.wcb'} - the write callback from the PSGI server
171 #                          optional, use this if you've already
172 #                          captured it elsewhere.  If not given,
173 #                          psgi_return will return an anonymous
174 #                          sub for the PSGI server to call
175 #
176 #   $env->{'qspawn.filter'} - filter callback, receives a string as input,
177 #                             undef on EOF
178 #
179 # $limiter - the Limiter object to use (uses the def_limiter if not given)
180 #
181 # $parse_hdr - Initial read function; often for parsing CGI header output.
182 #              It will be given the return value of sysread from the pipe
183 #              and a string ref of the current buffer.  Returns an arrayref
184 #              for PSGI responses.  2-element arrays in PSGI mean the
185 #              body will be streamed, later, via writes (push-based) to
186 #              psgix.io.  3-element arrays means the body is available
187 #              immediately (or streamed via ->getline (pull-based)).
188 sub psgi_return {
189         my ($self, $env, $limiter, $parse_hdr) = @_;
190         my ($fh, $rpipe);
191         my $end = sub {
192                 _psgi_finish($self, $env);
193                 $fh->close if $fh; # async-only
194         };
195
196         my $buf = '';
197         my $rd_hdr = sub {
198                 my $r = sysread($rpipe, $buf, 1024, length($buf));
199                 return if !defined($r) && $! == EAGAIN || $! == EINTR;
200                 $parse_hdr->($r, \$buf);
201         };
202
203         my $wcb = delete $env->{'qspawn.wcb'};
204         my $async = $env->{'pi-httpd.async'};
205
206         my $cb = sub {
207                 my $r = $rd_hdr->() or return;
208                 $rd_hdr = undef;
209                 my $filter = delete $env->{'qspawn.filter'};
210                 if (scalar(@$r) == 3) { # error
211                         if ($async) {
212                                 $async->close; # calls rpipe->close and $end
213                         } else {
214                                 $rpipe->close;
215                                 $end->();
216                         }
217                         $wcb->($r);
218                 } elsif ($async) {
219                         $fh = $wcb->($r); # scalar @$r == 2
220                         $fh = filter_fh($fh, $filter) if $filter;
221                         $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
222                 } else { # for synchronous PSGI servers
223                         require PublicInbox::GetlineBody;
224                         $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
225                                                                 $buf, $filter);
226                         $wcb->($r);
227                 }
228         };
229         $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
230         my $start_cb = sub { # may run later, much later...
231                 ($rpipe) = @_;
232                 if ($async) {
233                         # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
234                         $async = $async->($rpipe, $cb, $end);
235                 } else { # generic PSGI
236                         $cb->() while $rd_hdr;
237                 }
238         };
239
240         # the caller already captured the PSGI write callback from
241         # the PSGI server, so we can call ->start, here:
242         return $self->start($limiter, $start_cb) if $wcb;
243
244         # the caller will return this sub to the PSGI server, so
245         # it can set the response callback (that is, for PublicInbox::HTTP,
246         # the chunked_wcb or identity_wcb callback), but other HTTP servers
247         # are supported:
248         sub {
249                 ($wcb) = @_;
250                 $self->start($limiter, $start_cb);
251         };
252 }
253
254 package PublicInbox::Qspawn::Limiter;
255 use strict;
256 use warnings;
257
258 sub new {
259         my ($class, $max) = @_;
260         bless {
261                 # 32 is same as the git-daemon connection limit
262                 max => $max || 32,
263                 running => 0,
264                 run_queue => [],
265                 # RLIMIT_CPU => undef,
266                 # RLIMIT_DATA => undef,
267                 # RLIMIT_CORE => undef,
268         }, $class;
269 }
270
271 sub setup_rlimit {
272         my ($self, $name, $config) = @_;
273         foreach my $rlim (PublicInbox::Spawn::RLIMITS()) {
274                 my $k = lc($rlim);
275                 $k =~ tr/_//d;
276                 $k = "publicinboxlimiter.$name.$k";
277                 defined(my $v = $config->{$k}) or next;
278                 my @rlimit = split(/\s*,\s*/, $v);
279                 if (scalar(@rlimit) == 1) {
280                         push @rlimit, $rlimit[0];
281                 } elsif (scalar(@rlimit) != 2) {
282                         warn "could not parse $k: $v\n";
283                 }
284                 eval { require BSD::Resource };
285                 if ($@) {
286                         warn "BSD::Resource missing for $rlim";
287                         next;
288                 }
289                 foreach my $i (0..$#rlimit) {
290                         next if $rlimit[$i] ne 'INFINITY';
291                         $rlimit[$i] = BSD::Resource::RLIM_INFINITY();
292                 }
293                 $self->{$rlim} = \@rlimit;
294         }
295 }
296
297 # captures everything into a buffer and executes a callback when done
298 package PublicInbox::Qspawn::Qx;
299 use strict;
300 use warnings;
301
302 sub new {
303         my ($class) = @_;
304         my $buf = '';
305         bless \$buf, $class;
306 }
307
308 # called by PublicInbox::HTTPD::Async ($fh->write)
309 sub write {
310         ${$_[0]} .= $_[1];
311         undef;
312 }
313
314 1;