From 347c6ee595c37d4e2214cb297811f154a41c452f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 23 May 2016 04:01:14 +0000 Subject: [PATCH] git-http-backend: refactor to support cleanup We will have clients dropping connections during long clone and fetch operations; so do not retain references holding backend processes once we detect a client has dropped. --- lib/PublicInbox/GitHTTPBackend.pm | 82 +++++++++++++++---------------- lib/PublicInbox/HTTPD.pm | 5 +- lib/PublicInbox/HTTPD/Async.pm | 23 +++++---- 3 files changed, 55 insertions(+), 55 deletions(-) diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index 70990ebc..ded56b33 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -194,6 +194,7 @@ sub serve_smart { return; } $wpipe = $in = undef; + my $fh; my $end = sub { $rpipe = undef; my $e = $pid == waitpid($pid, 0) ? @@ -202,60 +203,57 @@ sub serve_smart { err($env, "git http-backend ($git_dir): $e"); drop_client($env); } + $fh->close if $fh; # async-only }; # Danga::Socket users, we queue up the read_enable callback to # fire after pending writes are complete: my $buf = ''; - if (my $async = $env->{'pi-httpd.async'}) { - my $res; - my $q = sub { - $async->close; - $end->(); - $res->(@_); - }; - # $async is PublicInbox::HTTPD::Async->new($rpipe, $cb) - $async = $async->($rpipe, sub { - my $r = sysread($rpipe, $buf, 1024, length($buf)); - if (!defined $r || $r == 0) { - return $q->(r(500, 'http-backend error')); - } - $r = parse_cgi_headers(\$buf) or return; - if ($r->[0] == 403) { - return $q->(serve_dumb($cgi, $git, $path)); - } - my $fh = $res->($r); - $fh->write($buf); - $buf = undef; - my $dst = Plack::Util::inline_object( - write => sub { $fh->write(@_) }, - close => sub { - $end->(); - $fh->close; - }); - $async->async_pass($env->{'psgix.io'}, $dst); - }); - sub { ($res) = @_ }; # let Danga::Socket handle the rest. - } else { # getline + close for other PSGI servers - my $r; - do { - $r = read($rpipe, $buf, 1024, length($buf)); - if (!defined $r || $r == 0) { - return r(500, 'http-backend error'); - } - $r = parse_cgi_headers(\$buf); - } until ($r); - return serve_dumb($cgi, $git, $path) if $r->[0] == 403; + my $rd_hdr = sub { + my $r = sysread($rpipe, $buf, 1024, length($buf)); + return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); + return r(500, 'http-backend error') unless $r; + $r = parse_cgi_headers(\$buf) or return; + $r->[0] == 403 ? serve_dumb($cgi, $git, $path) : $r; + }; + my $res; + my $async = $env->{'pi-httpd.async'}; + my $io = $env->{'psgix.io'}; + my $cb = sub { + my $r = $rd_hdr->() or return; + $rd_hdr = undef; + if (scalar(@$r) == 3) { # error: + $async->close if $async; + return $res->($r); + } + if ($async) { + $fh = $res->($r); + return $async->async_pass($io, $fh, \$buf); + } + + # for synchronous PSGI servers $r->[2] = Plack::Util::inline_object( - close => sub { $end->() }, + close => $end, getline => sub { my $ret = $buf; $buf = undef; defined $ret ? $ret : $rpipe->getline; }); - $r; + $res->($r); + }; + sub { + ($res) = @_; - } + # hopefully this doesn't break any middlewares, + # holding the input here is a waste of FDs and memory + $env->{'psgi.input'} = undef; + + if ($async) { + $async = $async->($rpipe, $cb, $end); + } else { # generic PSGI + $cb->() while $rd_hdr; + } + }; } sub input_to_file { diff --git a/lib/PublicInbox/HTTPD.pm b/lib/PublicInbox/HTTPD.pm index 78efaa50..433d6da7 100644 --- a/lib/PublicInbox/HTTPD.pm +++ b/lib/PublicInbox/HTTPD.pm @@ -8,10 +8,7 @@ use Plack::Util; require PublicInbox::HTTPD::Async; require PublicInbox::Daemon; -sub pi_httpd_async { - my ($io, $cb) = @_; - PublicInbox::HTTPD::Async->new($io, $cb); -} +sub pi_httpd_async { PublicInbox::HTTPD::Async->new(@_) } sub new { my ($class, $sock, $app) = @_; diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 8efa7a66..bd2eacbf 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -9,32 +9,33 @@ package PublicInbox::HTTPD::Async; use strict; use warnings; use base qw(Danga::Socket); -use fields qw(cb); +use fields qw(cb cleanup); sub new { - my ($class, $io, $cb) = @_; + my ($class, $io, $cb, $cleanup) = @_; my $self = fields::new($class); IO::Handle::blocking($io, 0); $self->SUPER::new($io); $self->{cb} = $cb; + $self->{cleanup} = $cleanup; $self->watch_read(1); $self; } sub async_pass { - my ($self, $io, $fh) = @_; + my ($self, $io, $fh, $bref) = @_; my $restart_read = sub { $self->watch_read(1) }; - # In case the client HTTP connection ($io) dies, it # will automatically close this ($self) object. $io->{forward} = $self; + $fh->write($$bref); $self->{cb} = sub { - my $r = sysread($self->{sock}, my $buf, 8192); + my $r = sysread($self->{sock}, $$bref, 8192); if ($r) { - $fh->write($buf); + $fh->write($$bref); if ($io->{write_buf_size}) { $self->watch_read(0); - $io->write($restart_read); + $io->write($restart_read); # D::S::write } return; # stay in watch_read } elsif (!defined $r) { @@ -42,9 +43,9 @@ sub async_pass { } # Done! Error handling will happen in $fh->close + # called by the {cleanup} handler $io->{forward} = undef; $self->close; - $fh->close; } } @@ -55,8 +56,12 @@ sub sysread { shift->{sock}->sysread(@_) } sub close { my $self = shift; - $self->{cb} = undef; + my $cleanup = $self->{cleanup}; + $self->{cleanup} = $self->{cb} = undef; $self->SUPER::close(@_); + + # we defer this to the next timer loop since close is deferred + Danga::Socket->AddTimer(0, $cleanup) if $cleanup; } # do not let ourselves be closed during graceful termination -- 2.44.0