]> Sergey Matveev's repositories - public-inbox.git/commitdiff
git-http-backend: switch to async_pass
authorEric Wong <e@80x24.org>
Sun, 22 May 2016 09:06:03 +0000 (09:06 +0000)
committerEric Wong <e@80x24.org>
Sun, 22 May 2016 10:08:44 +0000 (10:08 +0000)
This simplifies the code somewhat; but it could probably
still be made simpler.  It will need to support command
queueing for expensive commands so expensive processes
can be queued up.

lib/PublicInbox/GitHTTPBackend.pm

index 97d96d526121752060ff3274dbcdfee57cfebc31..cca8a6d6018c083f34077fe5834ad94f2ea89094 100644 (file)
@@ -10,6 +10,7 @@ use Fcntl qw(:seek);
 use IO::File;
 use PublicInbox::Spawn qw(spawn);
 use HTTP::Date qw(time2str);
+use HTTP::Status qw(status_message);
 
 # n.b. serving "description" and "cloneurl" should be innocuous enough to
 # not cause problems.  serving "config" might...
@@ -39,9 +40,12 @@ sub do_next () {
        }
 }
 
-sub r ($) {
-       my ($s) = @_;
-       [ $s, [qw(Content-Type text/plain Content-Length 0), @no_cache ], [] ]
+sub r ($;$) {
+       my ($code, $msg) = @_;
+       $msg ||= status_message($code);
+       my $len = length($msg);
+       [ $code, [qw(Content-Type text/plain Content-Length), $len, @no_cache],
+               [$msg] ]
 }
 
 sub serve {
@@ -167,14 +171,9 @@ sub prepare_range {
 sub serve_smart {
        my ($cgi, $git, $path) = @_;
        my $env = $cgi->{env};
-
-       my $input = $env->{'psgi.input'};
-       my $buf;
-       my $in;
-       my $fd = eval { fileno($input) };
-       if (defined $fd && $fd >= 0) {
-               $in = $input;
-       } else {
+       my $in = $env->{'psgi.input'};
+       my $fd = eval { fileno($in) };
+       unless (defined $fd && $fd >= 0) {
                $in = input_to_file($env) or return r(500);
        }
        my ($rpipe, $wpipe);
@@ -204,91 +203,67 @@ sub serve_smart {
                return;
        }
        $wpipe = $in = undef;
-       $buf = '';
-       my ($vin, $fh, $res);
-
-       # Danga::Socket users, we queue up the read_enable callback to
-       # fire after pending writes are complete:
-       my $pi_http = $env->{'psgix.io'};
-       my $read_enable = sub { $rpipe->watch_read(1) };
-       my $read_disable = sub {
-               $rpipe->watch_read(0);
-               $pi_http->write($read_enable);
-       };
-
        my $end = sub {
-               if ($fh) {
-                       $fh->close;
-                       $fh = undef;
-               }
-               if ($rpipe) {
-                       # _may_ be Danga::Socket::close via
-                       # PublicInbox::HTTPD::Async::close:
-                       $rpipe->close;
-                       $rpipe = undef;
-               }
-               if (defined $pid) {
-                       my $e = $pid == waitpid($pid, 0) ?
-                               $? : "PID:$pid still running?";
-                       err($env, "git http-backend ($git_dir): $e") if $e;
-               }
-               return unless $res;
-               my $dumb = serve_dumb($cgi, $git, $path);
-               ref($dumb) eq 'ARRAY' ? $res->($dumb) : $dumb->($res);
-       };
-       my $fail = sub {
-               if ($!{EAGAIN} || $!{EINTR}) {
-                       select($vin, undef, undef, undef) if defined $vin;
-                       # $vin is undef on async, so this is a noop on EAGAIN
-                       return;
+               $rpipe = undef;
+               my $e = $pid == waitpid($pid, 0) ?
+                       $? : "PID:$pid still running?";
+               if ($e) {
+                       err($env, "git http-backend ($git_dir): $e");
+                       drop_client($env);
                }
-               my $e = $!;
-               $end->();
-               err($env, "git http-backend ($git_dir): $e\n");
-       };
-       my $cb = sub { # read git-http-backend output and stream to client
-               my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0;
-               return $fail->() unless defined $r;
-               return $end->() if $r == 0; # EOF
-               if ($fh) { # stream body from git-http-backend to HTTP client
-                       $fh->write($buf);
-                       $buf = '';
-                       $read_disable->() if $read_disable;
-               } elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers
-                       my $h = $1;
-                       my $code = 200;
-                       my @h;
-                       foreach my $l (split(/\r\n/, $h)) {
-                               my ($k, $v) = split(/:\s*/, $l, 2);
-                               if ($k =~ /\AStatus\z/i) {
-                                       ($code) = ($v =~ /\b(\d+)\b/);
-                               } else {
-                                       push @h, $k, $v;
-                               }
-                       }
-                       if ($code == 403) {
-                               # smart cloning disabled, serve dumbly
-                               # in $end since we never undef $res in here
-                       } else { # write response header:
-                               $fh = $res->([ $code, \@h ]);
-                               $res = undef;
-                               $fh->write($buf);
-                       }
-                       $buf = '';
-               } # else { keep reading ... }
        };
+
+       # Danga::Socket users, we queue up the read_enable callback to
+       # fire after pending writes are complete:
+       my $buf = '';
        if (my $async = $env->{'pi-httpd.async'}) {
+               my $res;
+               my $q = sub {
+                       $async->close;
+                       $end->();
+                       $res->(@_);
+               };
                # $async is PublicInbox::HTTPD::Async->new($rpipe, $cb)
-               $rpipe = $async->($rpipe, $cb);
-               sub { ($res) = @_ } # let Danga::Socket handle the rest.
-       } else { # synchronous loop for other PSGI servers
-               $read_enable = $read_disable = undef;
-               $vin = '';
-               vec($vin, fileno($rpipe), 1) = 1;
-               sub {
-                       ($res) = @_;
-                       while ($rpipe) { $cb->() }
-               }
+               $async = $async->($rpipe, sub {
+                       my $r = sysread($rpipe, $buf, 1024, length($buf));
+                       if (!defined $r || $r == 0) {
+                               return $q->(r(500, 'http-backend error'));
+                       }
+                       $r = parse_cgi_headers(\$buf) or return;
+                       if ($r->[0] == 403) {
+                               return $q->(serve_dumb($cgi, $git, $path));
+                       }
+                       my $fh = $res->($r);
+                       $fh->write($buf);
+                       $buf = undef;
+                       my $dst = Plack::Util::inline_object(
+                               write => sub { $fh->write(@_) },
+                               close => sub {
+                                       $end->();
+                                       $fh->close;
+                               });
+                       $async->async_pass($env->{'psgix.io'}, $dst);
+               });
+               sub { ($res) = @_ }; # let Danga::Socket handle the rest.
+       } else { # getline + close for other PSGI servers
+               my $r;
+               do {
+                       $r = read($rpipe, $buf, 1024, length($buf));
+                       if (!defined $r || $r == 0) {
+                               return r(500, 'http-backend error');
+                       }
+                       $r = parse_cgi_headers(\$buf);
+               } until ($r);
+               return serve_dumb($cgi, $git, $path) if $r->[0] == 403;
+               $r->[2] = Plack::Util::inline_object(
+                       close => sub { $end->() },
+                       getline => sub {
+                               my $ret = $buf;
+                               $buf = undef;
+                               defined $ret ? $ret : $rpipe->getline;
+                       });
+               $r;
+
        }
 }
 
@@ -311,4 +286,21 @@ sub input_to_file {
        return $in;
 }
 
+sub parse_cgi_headers {
+       my ($bref) = @_;
+       $$bref =~ s/\A(.*?)\r\n\r\n//s or return;
+       my $h = $1;
+       my $code = 200;
+       my @h;
+       foreach my $l (split(/\r\n/, $h)) {
+               my ($k, $v) = split(/:\s*/, $l, 2);
+               if ($k =~ /\AStatus\z/i) {
+                       ($code) = ($v =~ /\b(\d+)\b/);
+               } else {
+                       push @h, $k, $v;
+               }
+       }
+       [ $code, \@h ]
+}
+
 1;