use Fcntl qw(:seek);
use IO::File;
use PublicInbox::Spawn qw(spawn);
+use HTTP::Date qw(time2str);
# TODO: make configurable, but keep in mind it's better to have
# multiple -httpd worker processes which are already scaled to
my $BIN = join('|', @binary);
my $TEXT = join('|', @text);
-sub r {
- [ $_[0] , [qw(Content-Type text/plain Content-Length 0) ], [] ]
+my @no_cache = ('Expires', 'Fri, 01 Jan 1980 00:00:00 GMT',
+ 'Pragma', 'no-cache',
+ 'Cache-Control', 'no-cache, max-age=0, must-revalidate');
+
+my $nextq;
+sub do_next () {
+ my $q = $nextq;
+ $nextq = undef;
+ while (my $cb = shift @$q) {
+ $cb->(); # this may redefine nextq
+ }
+}
+
+sub r ($) {
+ my ($s) = @_;
+ [ $s, [qw(Content-Type text/plain Content-Length 0), @no_cache ], [] ]
}
sub serve {
serve_dumb($cgi, $git, $path);
}
+sub err ($@) {
+ my ($env, @msg) = @_;
+ $env->{'psgi.errors'}->print(@msg, "\n");
+}
+
+sub drop_client ($) {
+ if (my $io = $_[0]->{'psgix.io'}) {
+ $io->close; # this is Danga::Socket::close
+ }
+}
+
sub serve_dumb {
my ($cgi, $git, $path) = @_;
+ my @h;
my $type;
if ($path =~ /\A(?:$BIN)\z/o) {
$type = 'application/octet-stream';
+ push @h, 'Expires', time2str(time + 31536000);
+ push @h, 'Cache-Control', 'public, max-age=31536000';
} elsif ($path =~ /\A(?:$TEXT)\z/o) {
$type = 'text/plain';
+ push @h, @no_cache;
} else {
return r(404);
}
+
my $f = "$git->{git_dir}/$path";
- return r(404) unless -f $f && -r _;
+ return r(404) unless -f $f && -r _; # just in case it's a FIFO :P
my @st = stat(_);
my $size = $st[7];
+ my $env = $cgi->{env};
- # TODO: If-Modified-Since and Last-Modified
+ # TODO: If-Modified-Since and Last-Modified?
open my $in, '<', $f or return r(404);
- my $code = 200;
my $len = $size;
- my @h;
+ my $n = 65536; # try to negotiate a big TCP window, first
+ my ($next, $fh);
+ my $cb = sub {
+ $n = $len if $len < $n;
+ my $r = sysread($in, my $buf, $n);
+ if (!defined $r) {
+ err($env, "$f read error: $!");
+ drop_client($env);
+ } elsif ($r <= 0) {
+ err($env, "$f EOF with $len bytes left");
+ drop_client($env);
+ } else {
+ $len -= $r;
+ $fh->write($buf);
+ if ($len == 0) {
+ $fh->close;
+ } elsif ($next) {
+ # avoid recursion in Danga::Socket::write
+ unless ($nextq) {
+ $nextq = [];
+ Danga::Socket->AddTimer(0, *do_next);
+ }
+ # avoid buffering too much in case we have
+ # slow clients:
+ $n = 8192;
+ push @$nextq, $next;
+ return;
+ }
+ }
+ # all done, cleanup references:
+ $fh = $next = undef;
+ };
- my $env = $cgi->{env};
+ my $code = 200;
+ push @h, 'Content-Type', $type;
my $range = $env->{HTTP_RANGE};
if (defined $range && $range =~ /\bbytes=(\d*)-(\d*)\z/) {
($code, $len) = prepare_range($cgi, $in, \@h, $1, $2, $size);
return [ 416, \@h, [] ];
}
}
+ push @h, 'Content-Length', $len;
- push @h, 'Content-Type', $type, 'Content-Length', $len;
sub {
my ($res) = @_; # Plack callback
- my $fh = $res->([ $code, \@h ]);
- my $buf;
- my $n = 8192;
- while ($len > 0) {
- $n = $len if $len < $n;
- my $r = sysread($in, $buf, $n);
- last if (!defined($r) || $r <= 0);
- $len -= $r;
- $fh->write($buf);
+ $fh = $res->([ $code, \@h ]);
+ if (defined $env->{'pi-httpd.async'}) {
+ my $pi_http = $env->{'psgix.io'};
+ $next = sub { $pi_http->write($cb) };
+ $cb->(); # start it off!
+ } else {
+ $cb->() while $fh;
}
- die "$f truncated with $len bytes remaining\n" if $len;
- $fh->close;
}
}
my $input = $env->{'psgi.input'};
my $buf;
my $in;
- my $err = $env->{'psgi.errors'};
my $fd = eval { fileno($input) };
if (defined $fd && $fd >= 0) {
$in = $input;
}
my ($rpipe, $wpipe);
unless (pipe($rpipe, $wpipe)) {
- $err->print("error creating pipe: $! - going static\n");
+ err($env, "error creating pipe: $! - going static");
return;
}
my %env = %ENV;
my %rdr = ( 0 => fileno($in), 1 => fileno($wpipe) );
my $pid = spawn([qw(git http-backend)], \%env, \%rdr);
unless (defined $pid) {
- $err->print("error spawning: $! - going static\n");
+ err($env, "error spawning: $! - going static");
return;
}
$wpipe = $in = undef;
$buf = '';
my ($vin, $fh, $res);
$nr_running++;
+
+ # Danga::Socket users, we queue up the read_enable callback to
+ # fire after pending writes are complete:
+ my $pi_http = $env->{'psgix.io'};
+ my $read_enable = sub { $rpipe->watch_read(1) };
+ my $read_disable = sub {
+ $rpipe->watch_read(0);
+ $pi_http->write($read_enable);
+ };
+
my $end = sub {
if ($fh) {
$fh->close;
if (defined $pid) {
my $e = $pid == waitpid($pid, 0) ?
$? : "PID:$pid still running?";
- if ($e) {
- $err->print("http-backend ($git_dir): $e\n");
- if (my $io = $env->{'psgix.io'}) {
- $io->close;
- }
- }
+ err($env, "git http-backend ($git_dir): $e") if $e;
}
return unless $res;
my $dumb = serve_dumb($cgi, $git, $path);
}
my $e = $!;
$end->();
- $err->print("git http-backend ($git_dir): $e\n");
+ err($env, "git http-backend ($git_dir): $e\n");
};
my $cb = sub { # read git-http-backend output and stream to client
my $r = $rpipe ? $rpipe->sysread($buf, 8192, length($buf)) : 0;
if ($fh) { # stream body from git-http-backend to HTTP client
$fh->write($buf);
$buf = '';
+ $read_disable->() if $read_disable;
} elsif ($buf =~ s/\A(.*?)\r\n\r\n//s) { # parse headers
my $h = $1;
my $code = 200;
$rpipe = $async->($rpipe, $cb);
sub { ($res) = @_ } # let Danga::Socket handle the rest.
} else { # synchronous loop for other PSGI servers
+ $read_enable = $read_disable = undef;
$vin = '';
vec($vin, fileno($rpipe), 1) = 1;
sub {
while (1) {
my $r = $input->read($buf, 8192);
unless (defined $r) {
- my $err = $env->{'psgi.errors'};
- $err->print("error reading input: $!\n");
+ err($env, "error reading input: $!");
return;
}
last if ($r == 0);