lib/PublicInbox/GetlineBody.pm
lib/PublicInbox/Git.pm
lib/PublicInbox/GitHTTPBackend.pm
+lib/PublicInbox/GzipFilter.pm
lib/PublicInbox/HTTP.pm
lib/PublicInbox/HTTPD.pm
lib/PublicInbox/HTTPD/Async.pm
t/git-http-backend.psgi
t/git.fast-import-data
t/git.t
+t/gzip_filter.t
t/hl_mod.t
t/html_index.t
t/httpd-corner.psgi
use warnings;
sub new {
- my ($class, $rpipe, $end, $end_arg, $buf) = @_;
+ my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_;
bless {
rpipe => $rpipe,
end => $end,
end_arg => $end_arg,
- buf => $buf,
- filter => 0,
+ initial_buf => $buf,
+ filter => $filter,
}, $class;
}
sub getline {
my ($self) = @_;
- my $filter = $self->{filter};
- return if $filter == -1; # last call was EOF
-
- my $buf = delete $self->{buf}; # initial buffer
- $buf = $self->{rpipe}->getline unless defined $buf;
- $self->{filter} = -1 unless defined $buf; # set EOF for next call
+ my $rpipe = $self->{rpipe} or return; # EOF was set on previous call
+ my $buf = delete($self->{initial_buf}) // $rpipe->getline;
+ delete($self->{rpipe}) unless defined $buf; # set EOF for next call
+ if (my $filter = $self->{filter}) {
+ $buf = $filter->translate($buf);
+ }
$buf;
}
sub close {
my ($self) = @_;
- my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)};
- close $rpipe if $rpipe;
+ my ($end, $end_arg) = delete @$self{qw(end end_arg)};
$end->($end_arg) if $end;
}
--- /dev/null
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Qspawn filter
+package PublicInbox::GzipFilter;
+use strict;
+use bytes (); # length
+use Compress::Raw::Zlib qw(Z_FINISH Z_OK);
+my %OPT = (-WindowBits => 15 + 16, -AppendOutput => 1);
+
+sub new {
+ my ($gz, $err) = Compress::Raw::Zlib::Deflate->new(%OPT);
+ $err == Z_OK or die "Deflate->new failed: $err";
+ bless { gz => $gz }, shift;
+}
+
+# for Qspawn if using $env->{'pi-httpd.async'}
+sub attach {
+ my ($self, $fh) = @_;
+ $self->{fh} = $fh;
+ $self
+}
+
+# for GetlineBody (via Qspawn) when NOT using $env->{'pi-httpd.async'}
+sub translate ($$) {
+ my $self = $_[0];
+ my $zbuf = delete($self->{zbuf});
+ if (defined $_[1]) { # my $buf = $_[1];
+ my $err = $self->{gz}->deflate($_[1], $zbuf);
+ die "gzip->deflate: $err" if $err != Z_OK;
+ return $zbuf if length($zbuf) >= 8192;
+
+ $self->{zbuf} = $zbuf;
+ '';
+ } else { # undef == EOF
+ my $err = $self->{gz}->flush($zbuf, Z_FINISH);
+ die "gzip->flush: $err" if $err != Z_OK;
+ $zbuf;
+ }
+}
+
+sub write {
+ # my $ret = bytes::length($_[1]); # XXX does anybody care?
+ $_[0]->{fh}->write(translate($_[0], $_[1]));
+}
+
+sub close {
+ my ($self) = @_;
+ my $fh = delete $self->{fh};
+ $fh->write(translate($self, undef));
+ $fh->close;
+}
+
+1;
my ($self) = @_;
my $r = rd_hdr($self) or return;
my $env = $self->{psgi_env};
+ my $filter = delete $env->{'qspawn.filter'};
my $wcb = delete $env->{'qspawn.wcb'};
my $async = delete $self->{async};
if (scalar(@$r) == 3) { # error
} elsif ($async) {
# done reading headers, handoff to read body
my $fh = $wcb->($r); # scalar @$r == 2
+ $fh = $filter->attach($fh) if $filter;
$self->{fh} = $fh;
$async->async_pass($env->{'psgix.io'}, $fh,
delete($self->{hdr_buf}));
require PublicInbox::GetlineBody;
$r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
\&event_step, $self,
- ${$self->{hdr_buf}});
+ ${$self->{hdr_buf}}, $filter);
$wcb->($r);
}
# psgi_return will return an anonymous
# sub for the PSGI server to call
#
+# $env->{'qspawn.filter'} - filter object, responds to ->attach for
+# pi-httpd.async and ->translate for generic
+# PSGI servers
+#
# $limiter - the Limiter object to use (uses the def_limiter if not given)
#
# $parse_hdr - Initial read function; often for parsing CGI header output.
--- /dev/null
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use Test::More;
+use IO::Handle (); # autoflush
+use Fcntl qw(SEEK_SET);
+use PublicInbox::TestCommon;
+require_mods(qw(Compress::Zlib IO::Uncompress::Gunzip));
+require_ok 'PublicInbox::GzipFilter';
+
+{
+ open my $fh, '+>', undef or die "open: $!";
+ open my $dup, '>&', $fh or die "dup $!";
+ $dup->autoflush(1);
+ my $filter = PublicInbox::GzipFilter->new->attach($dup);
+ ok($filter->write("hello"), 'wrote something');
+ ok($filter->write("world"), 'wrote more');
+ $filter->close;
+ seek($fh, 0, SEEK_SET) or die;
+ IO::Uncompress::Gunzip::gunzip($fh => \(my $buf));
+ is($buf, 'helloworld', 'buffer matches');
+}
+
+{
+ pipe(my ($r, $w)) or die "pipe: $!";
+ $w->autoflush(1);
+ close $r or die;
+ my $filter = PublicInbox::GzipFilter->new->attach($w);
+ my $sigpipe;
+ local $SIG{PIPE} = sub { $sigpipe = 1 };
+ open my $fh, '<', 'COPYING' or die "open(COPYING): $!";
+ my $buf = do { local $/; <$fh> };
+ while ($filter->write($buf .= rand)) {}
+ ok($sigpipe, 'got SIGPIPE');
+ close $w;
+}
+done_testing;
close $null;
[ 200, [ qw(Content-Type application/octet-stream) ]];
});
+ } elsif ($path eq '/psgi-return-gzip') {
+ require PublicInbox::Qspawn;
+ require PublicInbox::GzipFilter;
+ my $cmd = [qw(echo hello world)];
+ my $qsp = PublicInbox::Qspawn->new($cmd);
+ $env->{'qspawn.filter'} = PublicInbox::GzipFilter->new;
+ return $qsp->psgi_return($env, undef, sub {
+ [ 200, [ qw(Content-Type application/octet-stream)]]
+ });
} elsif ($path eq '/pid') {
$code = 200;
push @$body, "$$\n";
my $out = "$tmpdir/stdout.log";
my $psgi = "./t/httpd-corner.psgi";
my $sock = tcp_server() or die;
+my @zmods = qw(PublicInbox::GzipFilter IO::Uncompress::Gunzip);
# make sure stdin is not a pipe for lsof test to check for leaking pipes
open(STDIN, '<', '/dev/null') or die 'no /dev/null: $!';
close $fh or die "curl errored out \$?=$?";
is($n, 30 * 1024 * 1024, 'got expected output from curl');
is($non_zero, 0, 'read all zeros');
+
+ require_mods(@zmods, 1);
+ open $fh, '-|', qw(curl -sS), "$base/psgi-return-gzip" or die;
+ binmode $fh;
+ my $buf = do { local $/; <$fh> };
+ close $fh or die "curl errored out \$?=$?";
+ IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out));
+ is($out, "hello world\n");
}
{
is_deeply([], [keys %child], 'no extra pipes with -W0');
};
+# ensure compatibility with other PSGI servers
+SKIP: {
+ require_mods(@zmods, qw(Plack::Test HTTP::Request::Common), 3);
+ use_ok 'HTTP::Request::Common';
+ use_ok 'Plack::Test';
+ my $app = require $psgi;
+ test_psgi($app, sub {
+ my ($cb) = @_;
+ my $req = GET('http://example.com/psgi-return-gzip');
+ my $res = $cb->($req);
+ my $buf = $res->content;
+ IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out));
+ is($out, "hello world\n");
+ });
+}
+
done_testing();
sub capture {