From da9beb99af585718c36725f3457b1b72347bcebf Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 25 Dec 2016 08:53:19 +0000 Subject: [PATCH] qspawn: implement psgi_return and use it for githttpbackend Was: ("repobrowse: port patch generation over to qspawn") We'll be using it for githttpbackend and maybe other things. --- lib/PublicInbox/GitHTTPBackend.pm | 64 +++---------------------------- lib/PublicInbox/Qspawn.pm | 58 ++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 59 deletions(-) diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index 54ccfa05..ab43a009 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -200,69 +200,15 @@ sub serve_smart { $env{$name} = $val if defined $val; } my $limiter = $git->{-httpbackend_limiter} || $default_limiter; - my $git_dir = $git->{git_dir}; $env{GIT_HTTP_EXPORT_ALL} = '1'; - $env{PATH_TRANSLATED} = "$git_dir/$path"; + $env{PATH_TRANSLATED} = "$git->{git_dir}/$path"; my $rdr = { 0 => fileno($in) }; my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr); - my ($fh, $rpipe); - my $end = sub { - if (my $err = $qsp->finish) { - err($env, "git http-backend ($git_dir): $err"); - } - $fh->close if $fh; # async-only - }; - - # Danga::Socket users, we queue up the read_enable callback to - # fire after pending writes are complete: - my $buf = ''; - my $rd_hdr = sub { - my $r = sysread($rpipe, $buf, 1024, length($buf)); - return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); - return r(500, 'http-backend error') unless $r; - $r = parse_cgi_headers(\$buf) or return; # incomplete headers + $qsp->psgi_return($env, $limiter, sub { + my ($r, $bref) = @_; + $r = parse_cgi_headers($bref) or return; # incomplete headers $r->[0] == 403 ? serve_dumb($env, $git, $path) : $r; - }; - my $res; - my $async = $env->{'pi-httpd.async'}; # XXX unstable API - my $cb = sub { - my $r = $rd_hdr->() or return; - $rd_hdr = undef; - if (scalar(@$r) == 3) { # error: - if ($async) { - $async->close; # calls rpipe->close - } else { - $rpipe->close; - $end->(); - } - $res->($r); - } elsif ($async) { - $fh = $res->($r); - $async->async_pass($env->{'psgix.io'}, $fh, \$buf); - } else { # for synchronous PSGI servers - require PublicInbox::GetlineBody; - $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, - $buf); - $res->($r); - } - }; - sub { - ($res) = @_; - - # hopefully this doesn't break any middlewares, - # holding the input here is a waste of FDs and memory - $env->{'psgi.input'} = undef; - - $qsp->start($limiter, sub { # may run later, much later... - ($rpipe) = @_; - $in = undef; - if ($async) { - $async = $async->($rpipe, $cb, $end); - } else { # generic PSGI - $cb->() while $rd_hdr; - } - }); - }; + }); } sub input_to_file { diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 3500f8a4..b80dac1f 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -9,6 +9,7 @@ package PublicInbox::Qspawn; use strict; use warnings; use PublicInbox::Spawn qw(popen_rd); +my $def_limiter; sub new ($$$;) { my ($class, $cmd, $env, $opt) = @_; @@ -59,6 +60,63 @@ sub start { } } +sub psgi_return { + my ($self, $env, $limiter, $parse_hdr) = @_; + my ($fh, $rpipe); + my $end = sub { + if (my $err = $self->finish) { + $err = join(' ', @{$self->{args}->[0]}).": $err\n"; + $env->{'psgi.errors'}->print($err); + } + $fh->close if $fh; # async-only + }; + + # Danga::Socket users, we queue up the read_enable callback to + # fire after pending writes are complete: + my $buf = ''; + my $rd_hdr = sub { + my $r = sysread($rpipe, $buf, 1024, length($buf)); + return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); + $parse_hdr->($r, \$buf); + }; + my $res; + my $async = $env->{'pi-httpd.async'}; + my $cb = sub { + my $r = $rd_hdr->() or return; + $rd_hdr = undef; + if (scalar(@$r) == 3) { # error + if ($async) { + $async->close; # calls rpipe->close + } else { + $rpipe->close; + $end->(); + } + $res->($r); + } elsif ($async) { + $fh = $res->($r); # scalar @$r == 2 + $async->async_pass($env->{'psgix.io'}, $fh, \$buf); + } else { # for synchronous PSGI servers + require PublicInbox::GetlineBody; + $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, + $buf); + $res->($r); + } + }; + $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); + sub { + ($res) = @_; + $self->start($limiter, sub { # may run later, much later... + ($rpipe) = @_; + if ($async) { + # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $rd_hdr; + } + }); + }; +} + package PublicInbox::Qspawn::Limiter; use strict; use warnings; -- 2.44.0