]> Sergey Matveev's repositories - public-inbox.git/commitdiff
git-http-backend: refactor to support cleanup
authorEric Wong <e@80x24.org>
Mon, 23 May 2016 04:01:14 +0000 (04:01 +0000)
committerEric Wong <e@80x24.org>
Mon, 23 May 2016 05:58:35 +0000 (05:58 +0000)
We will have clients dropping connections during long clone
and fetch operations; so do not retain references holding
backend processes once we detect a client has dropped.

lib/PublicInbox/GitHTTPBackend.pm
lib/PublicInbox/HTTPD.pm
lib/PublicInbox/HTTPD/Async.pm

index 70990ebc5a6a8fa3f76ac433d811f888cc264a58..ded56b3330f742c95d965ab59ef18fd74b561215 100644 (file)
@@ -194,6 +194,7 @@ sub serve_smart {
                return;
        }
        $wpipe = $in = undef;
+       my $fh;
        my $end = sub {
                $rpipe = undef;
                my $e = $pid == waitpid($pid, 0) ?
@@ -202,60 +203,57 @@ sub serve_smart {
                        err($env, "git http-backend ($git_dir): $e");
                        drop_client($env);
                }
+               $fh->close if $fh; # async-only
        };
 
        # Danga::Socket users, we queue up the read_enable callback to
        # fire after pending writes are complete:
        my $buf = '';
-       if (my $async = $env->{'pi-httpd.async'}) {
-               my $res;
-               my $q = sub {
-                       $async->close;
-                       $end->();
-                       $res->(@_);
-               };
-               # $async is PublicInbox::HTTPD::Async->new($rpipe, $cb)
-               $async = $async->($rpipe, sub {
-                       my $r = sysread($rpipe, $buf, 1024, length($buf));
-                       if (!defined $r || $r == 0) {
-                               return $q->(r(500, 'http-backend error'));
-                       }
-                       $r = parse_cgi_headers(\$buf) or return;
-                       if ($r->[0] == 403) {
-                               return $q->(serve_dumb($cgi, $git, $path));
-                       }
-                       my $fh = $res->($r);
-                       $fh->write($buf);
-                       $buf = undef;
-                       my $dst = Plack::Util::inline_object(
-                               write => sub { $fh->write(@_) },
-                               close => sub {
-                                       $end->();
-                                       $fh->close;
-                               });
-                       $async->async_pass($env->{'psgix.io'}, $dst);
-               });
-               sub { ($res) = @_ }; # let Danga::Socket handle the rest.
-       } else { # getline + close for other PSGI servers
-               my $r;
-               do {
-                       $r = read($rpipe, $buf, 1024, length($buf));
-                       if (!defined $r || $r == 0) {
-                               return r(500, 'http-backend error');
-                       }
-                       $r = parse_cgi_headers(\$buf);
-               } until ($r);
-               return serve_dumb($cgi, $git, $path) if $r->[0] == 403;
+       my $rd_hdr = sub {
+               my $r = sysread($rpipe, $buf, 1024, length($buf));
+               return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
+               return r(500, 'http-backend error') unless $r;
+               $r = parse_cgi_headers(\$buf) or return;
+               $r->[0] == 403 ? serve_dumb($cgi, $git, $path) : $r;
+       };
+       my $res;
+       my $async = $env->{'pi-httpd.async'};
+       my $io = $env->{'psgix.io'};
+       my $cb = sub {
+               my $r = $rd_hdr->() or return;
+               $rd_hdr = undef;
+               if (scalar(@$r) == 3) { # error:
+                       $async->close if $async;
+                       return $res->($r);
+               }
+               if ($async) {
+                       $fh = $res->($r);
+                       return $async->async_pass($io, $fh, \$buf);
+               }
+
+               # for synchronous PSGI servers
                $r->[2] = Plack::Util::inline_object(
-                       close => sub { $end->() },
+                       close => $end,
                        getline => sub {
                                my $ret = $buf;
                                $buf = undef;
                                defined $ret ? $ret : $rpipe->getline;
                        });
-               $r;
+               $res->($r);
+       };
+       sub {
+               ($res) = @_;
 
-       }
+               # hopefully this doesn't break any middlewares,
+               # holding the input here is a waste of FDs and memory
+               $env->{'psgi.input'} = undef;
+
+               if ($async) {
+                       $async = $async->($rpipe, $cb, $end);
+               } else { # generic PSGI
+                       $cb->() while $rd_hdr;
+               }
+       };
 }
 
 sub input_to_file {
index 78efaa50b417177139333c52143247685e70757f..433d6da7787da13881ee78552d1da8be0712d343 100644 (file)
@@ -8,10 +8,7 @@ use Plack::Util;
 require PublicInbox::HTTPD::Async;
 require PublicInbox::Daemon;
 
-sub pi_httpd_async {
-       my ($io, $cb) = @_;
-       PublicInbox::HTTPD::Async->new($io, $cb);
-}
+sub pi_httpd_async { PublicInbox::HTTPD::Async->new(@_) }
 
 sub new {
        my ($class, $sock, $app) = @_;
index 8efa7a668ff027b02e78a1d9173487efb94e8423..bd2eacbfdfee8a5be8f3e9b423907a9ac16b5851 100644 (file)
@@ -9,32 +9,33 @@ package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(cb);
+use fields qw(cb cleanup);
 
 sub new {
-       my ($class, $io, $cb) = @_;
+       my ($class, $io, $cb, $cleanup) = @_;
        my $self = fields::new($class);
        IO::Handle::blocking($io, 0);
        $self->SUPER::new($io);
        $self->{cb} = $cb;
+       $self->{cleanup} = $cleanup;
        $self->watch_read(1);
        $self;
 }
 
 sub async_pass {
-       my ($self, $io, $fh) = @_;
+       my ($self, $io, $fh, $bref) = @_;
        my $restart_read = sub { $self->watch_read(1) };
-
        # In case the client HTTP connection ($io) dies, it
        # will automatically close this ($self) object.
        $io->{forward} = $self;
+       $fh->write($$bref);
        $self->{cb} = sub {
-               my $r = sysread($self->{sock}, my $buf, 8192);
+               my $r = sysread($self->{sock}, $$bref, 8192);
                if ($r) {
-                       $fh->write($buf);
+                       $fh->write($$bref);
                        if ($io->{write_buf_size}) {
                                $self->watch_read(0);
-                               $io->write($restart_read);
+                               $io->write($restart_read); # D::S::write
                        }
                        return; # stay in watch_read
                } elsif (!defined $r) {
@@ -42,9 +43,9 @@ sub async_pass {
                }
 
                # Done! Error handling will happen in $fh->close
+               # called by the {cleanup} handler
                $io->{forward} = undef;
                $self->close;
-               $fh->close;
        }
 }
 
@@ -55,8 +56,12 @@ sub sysread { shift->{sock}->sysread(@_) }
 
 sub close {
        my $self = shift;
-       $self->{cb} = undef;
+       my $cleanup = $self->{cleanup};
+       $self->{cleanup} = $self->{cb} = undef;
        $self->SUPER::close(@_);
+
+       # we defer this to the next timer loop since close is deferred
+       Danga::Socket->AddTimer(0, $cleanup) if $cleanup;
 }
 
 # do not let ourselves be closed during graceful termination