my $BIN = join('|', @binary);
my $TEXT = join('|', @text);
+my $nextq;
+sub do_next () {
+ my $q = $nextq;
+ $nextq = undef;
+ while (my $cb = shift @$q) {
+ $cb->(); # this may redefine nextq
+ }
+}
+
sub r {
[ $_[0] , [qw(Content-Type text/plain Content-Length 0) ], [] ]
}
serve_dumb($cgi, $git, $path);
}
+sub err ($@) {
+ my ($env, @msg) = @_;
+ $env->{'psgi.errors'}->print(@msg, "\n");
+}
+
+sub drop_client ($) {
+ if (my $io = $_[0]->{'psgix.io'}) {
+ $io->close; # this is Danga::Socket::close
+ }
+}
+
sub serve_dumb {
my ($cgi, $git, $path) = @_;
} else {
return r(404);
}
+
my $f = "$git->{git_dir}/$path";
- return r(404) unless -f $f && -r _;
+ return r(404) unless -f $f && -r _; # just in case it's a FIFO :P
my @st = stat(_);
my $size = $st[7];
+ my $env = $cgi->{env};
- # TODO: If-Modified-Since and Last-Modified
+ # TODO: If-Modified-Since and Last-Modified?
open my $in, '<', $f or return r(404);
- my $code = 200;
my $len = $size;
- my @h;
+ my $n = 65536; # try to negotiate a big TCP window, first
+ my ($next, $fh);
+ my $cb = sub {
+ $n = $len if $len < $n;
+ my $r = sysread($in, my $buf, $n);
+ if (!defined $r) {
+ err($env, "$f read error: $!");
+ drop_client($env);
+ } elsif ($r <= 0) {
+ err($env, "$f EOF with $len bytes left");
+ drop_client($env);
+ } else {
+ $len -= $r;
+ $fh->write($buf);
+ if ($len == 0) {
+ $fh->close;
+ } elsif ($next) {
+ # avoid recursion in Danga::Socket::write
+ unless ($nextq) {
+ $nextq = [];
+ Danga::Socket->AddTimer(0, *do_next);
+ }
+ # avoid buffering too much in case we have
+ # slow clients:
+ $n = 8192;
+ push @$nextq, $next;
+ return;
+ }
+ }
+ # all done, cleanup references:
+ $fh = $next = undef;
+ };
- my $env = $cgi->{env};
+ my $code = 200;
+ my @h = ('Content-Type', $type);
my $range = $env->{HTTP_RANGE};
if (defined $range && $range =~ /\bbytes=(\d*)-(\d*)\z/) {
($code, $len) = prepare_range($cgi, $in, \@h, $1, $2, $size);
return [ 416, \@h, [] ];
}
}
+ push @h, 'Content-Length', $len;
- push @h, 'Content-Type', $type, 'Content-Length', $len;
sub {
my ($res) = @_; # Plack callback
- my $fh = $res->([ $code, \@h ]);
- my $buf;
- my $n = 8192;
- while ($len > 0) {
- $n = $len if $len < $n;
- my $r = sysread($in, $buf, $n);
- last if (!defined($r) || $r <= 0);
- $len -= $r;
- $fh->write($buf);
+ $fh = $res->([ $code, \@h ]);
+ if (defined $env->{'pi-httpd.async'}) {
+ my $pi_http = $env->{'psgix.io'};
+ $next = sub { $pi_http->write($cb) };
+ $cb->(); # start it off!
+ } else {
+ $cb->() while $fh;
}
- die "$f truncated with $len bytes remaining\n" if $len;
- $fh->close;
}
}
my $input = $env->{'psgi.input'};
my $buf;
my $in;
- my $err = $env->{'psgi.errors'};
my $fd = eval { fileno($input) };
if (defined $fd && $fd >= 0) {
$in = $input;
}
my ($rpipe, $wpipe);
unless (pipe($rpipe, $wpipe)) {
- $err->print("error creating pipe: $! - going static\n");
+ err($env, "error creating pipe: $! - going static");
return;
}
my %env = %ENV;
my %rdr = ( 0 => fileno($in), 1 => fileno($wpipe) );
my $pid = spawn([qw(git http-backend)], \%env, \%rdr);
unless (defined $pid) {
- $err->print("error spawning: $! - going static\n");
+ err($env, "error spawning: $! - going static");
return;
}
$wpipe = $in = undef;
$buf = '';
my ($vin, $fh, $res);
$nr_running++;
+
+ # Danga::Socket users, we queue up the read_enable callback to
+ # fire after pending writes are complete:
+ my $pi_http = $env->{'psgix.io'};
+ my $read_enable = sub { $rpipe->watch_read(1) };
+ my $read_disable = sub {
+ $rpipe->watch_read(0);
+ $pi_http->write($read_enable);
+ };
+
my $end = sub {
if ($fh) {
$fh->close;
my $e = $pid == waitpid($pid, 0) ?
$? : "PID:$pid still running?";
if ($e) {
- $err->print("http-backend ($git_dir): $e\n");
- if (my $io = $env->{'psgix.io'}) {
- $io->close;
- }
+ err($env, "git http-backend ($git_dir): $e");
+ drop_client($env);
}
}
return unless $res;
}
my $e = $!;
$end->();
- $err->print("git http-backend ($git_dir): $e\n");
+ err($env, "git http-backend ($git_dir): $e\n");
};
my $cb = sub { # read git-http-backend output and stream to client
my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0;
if ($fh) { # stream body from git-http-backend to HTTP client
$fh->write($buf);
$buf = '';
+ $read_disable->() if $read_disable;
} elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers
my $h = $1;
my $code = 200;
$rpipe = $async->($rpipe, $cb);
sub { ($res) = @_ } # let Danga::Socket handle the rest.
} else { # synchronous loop for other PSGI servers
+ $read_enable = $read_disable = undef;
$vin = '';
vec($vin, fileno($rpipe), 1) = 1;
sub {
while (1) {
my $r = $input->read($buf, 8192);
unless (defined $r) {
- my $err = $env->{'psgi.errors'};
- $err->print("error reading input: $!\n");
+ err($env, "error reading input: $!");
return;
}
last if ($r == 0);
--- /dev/null
+#!/usr/bin/perl -w
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use PublicInbox::GitHTTPBackend;
+use PublicInbox::Git;
+use Plack::Builder;
+use Plack::Request;
+use BSD::Resource qw(getrusage);
+my $git_dir = $ENV{GIANT_GIT_DIR} or die 'GIANT_GIT_DIR not defined in env';
+my $git = PublicInbox::Git->new($git_dir);
+builder {
+ enable 'Chunked' if $ENV{TEST_CHUNK};
+ enable 'Head';
+ sub {
+ my ($env) = @_;
+ my $pr = Plack::Request->new($env);
+ if ($pr->path_info =~ m!\A/(.+)\z!s) {
+ PublicInbox::GitHTTPBackend::serve($pr, $git, $1);
+ } else {
+ my $ru = getrusage();
+ my $b = $ru->maxrss . "\n";
+ [ 200, [ qw(Content-Type text/plain Content-Length),
+ length($b) ], [ $b ] ]
+ }
+ }
+}
--- /dev/null
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw/tempdir/;
+use IO::Socket;
+use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
+use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
+use POSIX qw(dup2 setsid);
+use Cwd qw(getcwd);
+
+my $git_dir = $ENV{GIANT_GIT_DIR};
+plan 'skip_all' => 'GIANT_GIT_DIR not defined' unless $git_dir;
+foreach my $mod (qw(Danga::Socket
+ Plack::Util Plack::Request Plack::Builder
+ HTTP::Date HTTP::Status Net::HTTP)) {
+ eval "require $mod";
+ plan skip_all => "$mod missing for git-http-backend.t" if $@;
+}
+my $psgi = getcwd()."/t/git-http-backend.psgi";
+my $tmpdir = tempdir('pi-git-http-backend-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $err = "$tmpdir/stderr.log";
+my $out = "$tmpdir/stdout.log";
+my $httpd = 'blib/script/public-inbox-httpd';
+my %opts = (
+ LocalAddr => '127.0.0.1',
+ ReuseAddr => 1,
+ Proto => 'tcp',
+ Type => SOCK_STREAM,
+ Listen => 1024,
+);
+my $sock = IO::Socket::INET->new(%opts);
+my $host = $sock->sockhost;
+my $port = $sock->sockport;
+my $pid;
+END { kill 'TERM', $pid if defined $pid };
+
+my $get_maxrss = sub {
+ my $http = Net::HTTP->new(Host => "$host:$port");
+ ok($http, 'Net::HTTP object created for maxrss');
+ $http->write_request(GET => '/');
+ my ($code, $mess, %h) = $http->read_response_headers;
+ is($code, 200, 'success reading maxrss');
+ my $n = $http->read_entity_body(my $buf, 256);
+ ok(defined $n, 'read response body');
+ like($buf, qr/\A\d+\n\z/, 'got memory response');
+ ok(int($buf) > 0, 'got non-zero memory response');
+ int($buf);
+};
+
+{
+ ok($sock, 'sock created');
+ $pid = fork;
+ if ($pid == 0) { # pretend to be systemd
+ fcntl($sock, F_SETFD, 0);
+ dup2(fileno($sock), 3) or die "dup2 failed: $!\n";
+ $ENV{LISTEN_PID} = $$;
+ $ENV{LISTEN_FDS} = 1;
+ $ENV{TEST_CHUNK} = '1';
+ exec $httpd, "--stdout=$out", "--stderr=$err", $psgi;
+ die "FAIL: $!\n";
+ }
+ ok(defined $pid, 'forked httpd process successfully');
+}
+my $mem_a = $get_maxrss->();
+
+SKIP: {
+ my $max = 0;
+ my $pack;
+ my $glob = "$git_dir/objects/pack/pack-*.pack";
+ foreach my $f (glob($glob)) {
+ my $n = -s $f;
+ if ($n > $max) {
+ $max = $n;
+ $pack = $f;
+ }
+ }
+ skip "no packs found in $git_dir" unless defined $pack;
+ if ($pack !~ m!(/objects/pack/pack-[a-f0-9]{40}.pack)\z!) {
+ skip "bad pack name: $pack";
+ }
+ my $url = $1;
+ my $http = Net::HTTP->new(Host => "$host:$port");
+ ok($http, 'Net::HTTP object created');
+ $http->write_request(GET => $url);
+ my ($code, $mess, %h) = $http->read_response_headers;
+ is(200, $code, 'got 200 success for pack');
+ is($max, $h{'Content-Length'}, 'got expected Content-Length for pack');
+ foreach my $i (1..3) {
+ sleep 1;
+ my $diff = $get_maxrss->() - $mem_a;
+ note "${diff}K memory increase after $i seconds";
+ ok($diff < 1024, 'no bloating caused by slow dumb client');
+ }
+}
+
+{
+ my $c = fork;
+ if ($c == 0) {
+ setsid();
+ exec qw(git clone -q --mirror), "http://$host:$port/",
+ "$tmpdir/mirror.git";
+ die "Failed start git clone: $!\n";
+ }
+ select(undef, undef, undef, 0.1);
+ foreach my $i (1..10) {
+ is(1, kill('STOP', -$c), 'signaled clone STOP');
+ sleep 1;
+ ok(kill('CONT', -$c), 'continued clone');
+ my $diff = $get_maxrss->() - $mem_a;
+ note "${diff}K memory increase after $i seconds";
+ ok($diff < 2048, 'no bloating caused by slow smart client');
+ }
+ ok(kill('CONT', -$c), 'continued clone');
+ is($c, waitpid($c, 0), 'reaped wayward slow clone');
+ is($?, 0, 'clone did not error out');
+ note 'clone done, fsck-ing clone result...';
+ is(0, system("git", "--git-dir=$tmpdir/mirror.git",
+ qw(fsck --no-progress)),
+ 'fsck did not report corruption');
+
+ my $diff = $get_maxrss->() - $mem_a;
+ note "${diff}K memory increase after smart clone";
+ ok($diff < 2048, 'no bloating caused by slow smart client');
+}
+
+{
+ ok(kill('TERM', $pid), 'killed httpd');
+ $pid = undef;
+ waitpid(-1, 0);
+}
+
+done_testing();