X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FQspawn.pm;h=651fa39096d7144beb7d9bb6030fcb67c9873df0;hb=4d44a2ca68ba1b88ee5245df7b74492f9e41db6d;hp=844d50f7cdeedb6775df8183df8a2d3f9bb83c18;hpb=641191aa902bb3259a13d0f348b78a49aafe4902;p=public-inbox.git diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 844d50f7..651fa390 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -44,11 +44,11 @@ sub new ($$$;) { } sub _do_spawn { - my ($self, $cb) = @_; + my ($self, $start_cb, $limiter) = @_; my $err; my ($cmd, $env, $opts) = @{$self->{args}}; my %opts = %{$opts || {}}; - my $limiter = $self->{limiter}; + $self->{limiter} = $limiter; foreach my $k (PublicInbox::Spawn::RLIMITS()) { if (defined(my $rlimit = $limiter->{$k})) { $opts{$k} = $rlimit; @@ -66,7 +66,7 @@ sub _do_spawn { } else { $self->{err} = $!; } - $cb->($self->{rpipe}); + $start_cb->($self->{rpipe}); } sub child_err ($) { @@ -94,29 +94,34 @@ sub waitpid_err ($$) { $err = "W: waitpid($xpid, 0) => $pid: $!"; } # else should not be called with pid == 0 + my $env = delete $self->{env}; + # done, spawn whatever's in the queue my $limiter = $self->{limiter}; my $running = --$limiter->{running}; - # limiter->{max} may change dynamically - if (($running || $limiter->{running}) < $limiter->{max}) { - if (my $next = shift @{$limiter->{run_queue}}) { - _do_spawn(@$next); + if ($running < $limiter->{max}) { + if (my $next = shift(@{$limiter->{run_queue}})) { + _do_spawn(@$next, $limiter); } } - return unless $err; - $self->{err} = $err; - my $env = $self->{env} or return; - if (!$env->{'qspawn.quiet'}) { - log_err($env, join(' ', @{$self->{args}}) . ": $err"); + if ($err) { + $self->{err} = $err; + if ($env && !$env->{'qspawn.quiet'}) { + log_err($env, join(' ', @{$self->{args}}) . ": $err"); + } + } + if (my $fin_cb = delete $self->{fin_cb}) { + eval { $fin_cb->() } } } -sub do_waitpid ($;$) { - my ($self, $env) = @_; +sub do_waitpid ($;$$) { + my ($self, $env, $fin_cb) = @_; my $pid = $self->{pid}; $self->{env} = $env; + $self->{fin_cb} = $fin_cb; # PublicInbox::DS may not be loaded eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) }; # done if we're running in PublicInbox::DS::EventLoop @@ -127,29 +132,21 @@ sub do_waitpid ($;$) { } } -sub finish ($;$) { - my ($self, $env) = @_; +sub finish ($;$$) { + my ($self, $env, $fin_cb) = @_; if (delete $self->{rpipe}) { - do_waitpid($self, $env); - } - - # limiter->{max} may change dynamically - my $limiter = $self->{limiter}; - if ($limiter->{running} < $limiter->{max}) { - if (my $next = shift @{$limiter->{run_queue}}) { - _do_spawn(@$next); - } + do_waitpid($self, $env, $fin_cb); + } elsif ($fin_cb) { + eval { $fin_cb->() }; } } sub start { - my ($self, $limiter, $cb) = @_; - $self->{limiter} = $limiter; - + my ($self, $limiter, $start_cb) = @_; if ($limiter->{running} < $limiter->{max}) { - _do_spawn($self, $cb); + _do_spawn($self, $start_cb, $limiter); } else { - push @{$limiter->{run_queue}}, [ $self, $cb ]; + push @{$limiter->{run_queue}}, [ $self, $start_cb ]; } } @@ -164,9 +161,8 @@ sub psgi_qx { my $end = sub { my $err = $_[0]; # $! log_err($env, "psgi_qx: $err") if defined($err); - finish($self, $env); - eval { $qx_cb->(\$scalar) }; - $qx = $scalar = undef; + finish($self, $env, sub { $qx_cb->(\$scalar) }); + $qx = undef; }; my $rpipe; # comes from popen_rd my $async = $env->{'pi-httpd.async'}; @@ -185,11 +181,12 @@ reread: } }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); - $self->start($limiter, sub { # may run later, much later... + $self->start($limiter, sub { # start_cb, may run later, much later... ($rpipe) = @_; # popen_rd result if ($async) { # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) $async = $async->($rpipe, $cb, $end); + # $cb will call ->async_pass or ->close } else { # generic PSGI $cb->() while $qx; } @@ -264,7 +261,7 @@ sub psgi_return { $ret; }; - my $wcb = delete $env->{'qspawn.wcb'}; + my $wcb = delete $env->{'qspawn.wcb'}; # or PSGI server supplies it my $async = $env->{'pi-httpd.async'}; my $cb = sub { @@ -290,6 +287,10 @@ sub psgi_return { $buf, $filter); $wcb->($r); } + + # Workaround a leak under Perl 5.16.3 when combined with + # Plack::Middleware::Deflater: + $wcb = undef; }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); my $start_cb = sub { # may run later, much later... @@ -297,6 +298,7 @@ sub psgi_return { if ($async) { # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) $async = $async->($rpipe, $cb, $end); + # $cb will call ->async_pass or ->close } else { # generic PSGI $cb->() while $rd_hdr; }