]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/Qspawn.pm
qspawn: implement psgi_return and use it for githttpbackend
[public-inbox.git] / lib / PublicInbox / Qspawn.pm
1 # Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # Limits the number of processes spawned
5 # This does not depend on Danga::Socket or any other external
6 # scheduling mechanism, you just need to call start and finish
7 # appropriately
8 package PublicInbox::Qspawn;
9 use strict;
10 use warnings;
11 use PublicInbox::Spawn qw(popen_rd);
12 my $def_limiter;
13
14 sub new ($$$;) {
15         my ($class, $cmd, $env, $opt) = @_;
16         bless { args => [ $cmd, $env, $opt ] }, $class;
17 }
18
19 sub _do_spawn {
20         my ($self, $cb) = @_;
21         my $err;
22
23         ($self->{rpipe}, $self->{pid}) = popen_rd(@{$self->{args}});
24         if (defined $self->{pid}) {
25                 $self->{limiter}->{running}++;
26         } else {
27                 $self->{err} = $!;
28         }
29         $cb->($self->{rpipe});
30 }
31
32 sub finish ($) {
33         my ($self) = @_;
34         my $limiter = $self->{limiter};
35         my $running;
36         if (delete $self->{rpipe}) {
37                 my $pid = delete $self->{pid};
38                 $self->{err} = $pid == waitpid($pid, 0) ? $? :
39                                 "PID:$pid still running?";
40                 $running = --$limiter->{running};
41         }
42
43         # limiter->{max} may change dynamically
44         if (($running || $limiter->{running}) < $limiter->{max}) {
45                 if (my $next = shift @{$limiter->{run_queue}}) {
46                         _do_spawn(@$next);
47                 }
48         }
49         $self->{err};
50 }
51
52 sub start {
53         my ($self, $limiter, $cb) = @_;
54         $self->{limiter} = $limiter;
55
56         if ($limiter->{running} < $limiter->{max}) {
57                 _do_spawn($self, $cb);
58         } else {
59                 push @{$limiter->{run_queue}}, [ $self, $cb ];
60         }
61 }
62
63 sub psgi_return {
64         my ($self, $env, $limiter, $parse_hdr) = @_;
65         my ($fh, $rpipe);
66         my $end = sub {
67                 if (my $err = $self->finish) {
68                         $err = join(' ', @{$self->{args}->[0]}).": $err\n";
69                         $env->{'psgi.errors'}->print($err);
70                 }
71                 $fh->close if $fh; # async-only
72         };
73
74         # Danga::Socket users, we queue up the read_enable callback to
75         # fire after pending writes are complete:
76         my $buf = '';
77         my $rd_hdr = sub {
78                 my $r = sysread($rpipe, $buf, 1024, length($buf));
79                 return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
80                 $parse_hdr->($r, \$buf);
81         };
82         my $res;
83         my $async = $env->{'pi-httpd.async'};
84         my $cb = sub {
85                 my $r = $rd_hdr->() or return;
86                 $rd_hdr = undef;
87                 if (scalar(@$r) == 3) { # error
88                         if ($async) {
89                                 $async->close; # calls rpipe->close
90                         } else {
91                                 $rpipe->close;
92                                 $end->();
93                         }
94                         $res->($r);
95                 } elsif ($async) {
96                         $fh = $res->($r); # scalar @$r == 2
97                         $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
98                 } else { # for synchronous PSGI servers
99                         require PublicInbox::GetlineBody;
100                         $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
101                                                                 $buf);
102                         $res->($r);
103                 }
104         };
105         $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
106         sub {
107                 ($res) = @_;
108                 $self->start($limiter, sub { # may run later, much later...
109                         ($rpipe) = @_;
110                         if ($async) {
111                         # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
112                                 $async = $async->($rpipe, $cb, $end);
113                         } else { # generic PSGI
114                                 $cb->() while $rd_hdr;
115                         }
116                 });
117         };
118 }
119
120 package PublicInbox::Qspawn::Limiter;
121 use strict;
122 use warnings;
123
124 sub new {
125         my ($class, $max) = @_;
126         bless {
127                 # 32 is same as the git-daemon connection limit
128                 max => $max || 32,
129                 running => 0,
130                 run_queue => [],
131         }, $class;
132 }
133
134 1;