]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/GitHTTPBackend.pm
git-http-backend: start async API for streaming
[public-inbox.git] / lib / PublicInbox / GitHTTPBackend.pm
index 71b7a8f1b1c12278730dd8c8cd64cf6f3a93d415..9c32535bb70ad414195017729e73fd448d84054e 100644 (file)
@@ -55,7 +55,7 @@ sub serve {
        my $len = $size;
        my @h;
 
-       my $env = $cgi->{env} || \%ENV;
+       my $env = $cgi->{env};
        my $range = $env->{HTTP_RANGE};
        if (defined $range && $range =~ /\bbytes=(\d*)-(\d*)\z/) {
                ($code, $len) = prepare_range($cgi, $in, \@h, $1, $2, $size);
@@ -117,9 +117,7 @@ sub prepare_range {
                        push @$h, "bytes $beg-$end/$size";
 
                        # FIXME: Plack::Middleware::Deflater bug?
-                       if (my $env = $cgi->{env}) {
-                               $env->{'psgix.no-compress'} = 1;
-                       }
+                       $cgi->{env}->{'psgix.no-compress'} = 1;
                }
        }
        ($code, $len);
@@ -134,9 +132,9 @@ sub serve_smart {
        my $buf;
        my $in;
        my $err = $env->{'psgi.errors'};
-       if (fileno($input) >= 0) { # FIXME untested
+       if (fileno($input) >= 0) {
                $in = $input;
-       } else {
+       } else { # FIXME untested
                $in = IO::File->new_tmpfile;
                while (1) {
                        my $r = $input->read($buf, 8192);
@@ -150,12 +148,17 @@ sub serve_smart {
                $in->flush;
                $in->sysseek(0, SEEK_SET);
        }
-       my $out = IO::File->new_tmpfile;
+       my ($rpipe, $wpipe);
+       unless (pipe($rpipe, $wpipe)) {
+               $err->print('error creating pipe', $!, "\n");
+               return r(500);
+       }
        my $pid = fork; # TODO: vfork under Linux...
        unless (defined $pid) {
                $err->print('error forking: ', $!, "\n");
                return r(500);
        }
+       my $git_dir = $git->{git_dir};
        if ($pid == 0) {
                # GIT_HTTP_EXPORT_ALL, GIT_COMMITTER_NAME, GIT_COMMITTER_EMAIL
                # may be set in the server-process and are passed as-is
@@ -170,44 +173,79 @@ sub serve_smart {
                }
                # $ENV{GIT_PROJECT_ROOT} = $git->{git_dir};
                $ENV{GIT_HTTP_EXPORT_ALL} = '1';
-               $ENV{PATH_TRANSLATED} = "$git->{git_dir}/$path";
+               $ENV{PATH_TRANSLATED} = "$git_dir/$path";
                dup2(fileno($in), 0) or die "redirect stdin failed: $!\n";
-               dup2(fileno($out), 1) or die "redirect stdout failed: $!\n";
+               dup2(fileno($wpipe), 1) or die "redirect stdout failed: $!\n";
                my @cmd = qw(git http-backend);
                exec(@cmd) or die 'exec `' . join(' ', @cmd). "' failed: $!\n";
        }
-
-       if (waitpid($pid, 0) != $pid) {
-               $err->print("git http-backend ($git->{git_dir}): ", $?, "\n");
-               return r(500);
-       }
-       $in = undef;
-       $out->seek(0, SEEK_SET);
-       my @h;
-       my $code = 200;
-       {
-               local $/ = "\r\n";
-               while (defined(my $line = <$out>)) {
-                       if ($line =~ /\AStatus:\s*(\d+)/) {
-                               $code = $1;
-                       } else {
-                               chomp $line;
-                               last if $line eq '';
-                               push @h, split(/:\s*/, $line, 2);
-                       }
+       $wpipe = $in = undef;
+       $buf = '';
+       my ($vin, $fh, $res);
+       my $end = sub {
+               if ($fh) {
+                       $fh->close;
+                       $fh = undef;
+               } else {
+                       $res->(r(500)) if $res;
                }
-       }
-       return if $code == 403;
-       sub {
-               my ($cb) = @_;
-               my $fh = $cb->([ $code, \@h ]);
-               while (1) {
-                       my $r = $out->read($buf, 8192);
-                       die "$!\n" unless defined $r;
-                       last if ($r == 0);
+               if ($rpipe) {
+                       $rpipe->close; # _may_ be Danga::Socket::close
+                       $rpipe = undef;
+               }
+               if (defined $pid) {
+                       my $wpid = $pid;
+                       $pid = undef;
+                       return if $wpid == waitpid($wpid, 0);
+                       $err->print("git http-backend ($git_dir): $?\n");
+               }
+       };
+       my $fail = sub {
+               my ($e) = @_;
+               if ($e eq 'EAGAIN') {
+                       select($vin, undef, undef, undef) if defined $vin;
+                       # $vin is undef on async, so this is a noop on EAGAIN
+                       return;
+               }
+               $end->();
+               $err->print("git http-backend ($git_dir): $e\n");
+       };
+       my $cb = sub { # read git-http-backend output and stream to client
+               my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0;
+               return $fail->($!{EAGAIN} ? 'EAGAIN' : $!) unless defined $r;
+               return $end->() if $r == 0; # EOF
+               if ($fh) { # stream body from git-http-backend to HTTP client
                        $fh->write($buf);
+                       $buf = '';
+               } elsif ($buf =~ s/\A(.*?)\r?\n\r?\n//s) { # parse headers
+                       my $h = $1;
+                       my $code = 200;
+                       my @h;
+                       foreach my $l (split(/\r?\n/, $h)) {
+                               my ($k, $v) = split(/:\s*/, $l, 2);
+                               if ($k =~ /\AStatus\z/i) {
+                                       $code = int($v);
+                               } else {
+                                       push @h, $k, $v;
+                               }
+                       }
+                       # write response header:
+                       $fh = $res->([ $code, \@h ]);
+                       $res = undef;
+                       $fh->write($buf);
+                       $buf = '';
+               } # else { keep reading ... }
+       };
+       if (my $async = $env->{'pi-httpd.async'}) {
+               $rpipe = $async->($rpipe, $cb);
+               sub { ($res) = @_ } # let Danga::Socket handle the rest.
+       } else { # synchronous loop
+               $vin = '';
+               vec($vin, fileno($rpipe), 1) = 1;
+               sub {
+                       ($res) = @_;
+                       while ($rpipe) { $cb->() }
                }
-               $fh->close;
        }
 }