]> Sergey Matveev's repositories - public-inbox.git/commitdiff
qspawn: reinstate filter support, add gzip filter
authorEric Wong <e@yhbt.net>
Sat, 21 Mar 2020 02:03:44 +0000 (02:03 +0000)
committerEric Wong <e@yhbt.net>
Wed, 25 Mar 2020 01:48:34 +0000 (01:48 +0000)
We'll be supporting gzipped from sqlite3(1) dumps
for altid files in future commits.

In the future (and if we survive), we may replace
Plack::Middleware::Deflater with our own GzipFilter to work
better with asynchronous responses without relying on
memory-intensive anonymous subs.

MANIFEST
lib/PublicInbox/GetlineBody.pm
lib/PublicInbox/GzipFilter.pm [new file with mode: 0644]
lib/PublicInbox/Qspawn.pm
t/gzip_filter.t [new file with mode: 0644]
t/httpd-corner.psgi
t/httpd-corner.t

index f077d722694e22ad8249d6060d6e4f293621f177..d53af77c0a2df64a1316bbf73c6754600850665a 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -111,6 +111,7 @@ lib/PublicInbox/Filter/Vger.pm
 lib/PublicInbox/GetlineBody.pm
 lib/PublicInbox/Git.pm
 lib/PublicInbox/GitHTTPBackend.pm
+lib/PublicInbox/GzipFilter.pm
 lib/PublicInbox/HTTP.pm
 lib/PublicInbox/HTTPD.pm
 lib/PublicInbox/HTTPD/Async.pm
@@ -233,6 +234,7 @@ t/filter_vger.t
 t/git-http-backend.psgi
 t/git.fast-import-data
 t/git.t
+t/gzip_filter.t
 t/hl_mod.t
 t/html_index.t
 t/httpd-corner.psgi
index 92719a8255c56f46fcf9312ac7c669a6f64104da..6becaaf5fc6af0b22d9bd3847e36257e83be8845 100644 (file)
@@ -13,13 +13,13 @@ use strict;
 use warnings;
 
 sub new {
-       my ($class, $rpipe, $end, $end_arg, $buf) = @_;
+       my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_;
        bless {
                rpipe => $rpipe,
                end => $end,
                end_arg => $end_arg,
-               buf => $buf,
-               filter => 0,
+               initial_buf => $buf,
+               filter => $filter,
        }, $class;
 }
 
@@ -30,19 +30,18 @@ sub DESTROY { $_[0]->close }
 
 sub getline {
        my ($self) = @_;
-       my $filter = $self->{filter};
-       return if $filter == -1; # last call was EOF
-
-       my $buf = delete $self->{buf}; # initial buffer
-       $buf = $self->{rpipe}->getline unless defined $buf;
-       $self->{filter} = -1 unless defined $buf; # set EOF for next call
+       my $rpipe = $self->{rpipe} or return; # EOF was set on previous call
+       my $buf = delete($self->{initial_buf}) // $rpipe->getline;
+       delete($self->{rpipe}) unless defined $buf; # set EOF for next call
+       if (my $filter = $self->{filter}) {
+               $buf = $filter->translate($buf);
+       }
        $buf;
 }
 
 sub close {
        my ($self) = @_;
-       my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)};
-       close $rpipe if $rpipe;
+       my ($end, $end_arg) = delete @$self{qw(end end_arg)};
        $end->($end_arg) if $end;
 }
 
diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm
new file mode 100644 (file)
index 0000000..d883130
--- /dev/null
@@ -0,0 +1,54 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Qspawn filter
+package PublicInbox::GzipFilter;
+use strict;
+use bytes (); # length
+use Compress::Raw::Zlib qw(Z_FINISH Z_OK);
+my %OPT = (-WindowBits => 15 + 16, -AppendOutput => 1);
+
+sub new {
+       my ($gz, $err) = Compress::Raw::Zlib::Deflate->new(%OPT);
+       $err == Z_OK or die "Deflate->new failed: $err";
+       bless { gz => $gz }, shift;
+}
+
+# for Qspawn if using $env->{'pi-httpd.async'}
+sub attach {
+       my ($self, $fh) = @_;
+       $self->{fh} = $fh;
+       $self
+}
+
+# for GetlineBody (via Qspawn) when NOT using $env->{'pi-httpd.async'}
+sub translate ($$) {
+       my $self = $_[0];
+       my $zbuf = delete($self->{zbuf});
+       if (defined $_[1]) { # my $buf = $_[1];
+               my $err = $self->{gz}->deflate($_[1], $zbuf);
+               die "gzip->deflate: $err" if $err != Z_OK;
+               return $zbuf if length($zbuf) >= 8192;
+
+               $self->{zbuf} = $zbuf;
+               '';
+       } else { # undef == EOF
+               my $err = $self->{gz}->flush($zbuf, Z_FINISH);
+               die "gzip->flush: $err" if $err != Z_OK;
+               $zbuf;
+       }
+}
+
+sub write {
+       # my $ret = bytes::length($_[1]); # XXX does anybody care?
+       $_[0]->{fh}->write(translate($_[0], $_[1]));
+}
+
+sub close {
+       my ($self) = @_;
+       my $fh = delete $self->{fh};
+       $fh->write(translate($self, undef));
+       $fh->close;
+}
+
+1;
index 63ec3648c933bdb7775a51a2fe90e3cfc235231c..52aea3eba472e0ff8bfe2b79e21893eb24c1ff56 100644 (file)
@@ -243,6 +243,7 @@ sub psgi_return_init_cb {
        my ($self) = @_;
        my $r = rd_hdr($self) or return;
        my $env = $self->{psgi_env};
+       my $filter = delete $env->{'qspawn.filter'};
        my $wcb = delete $env->{'qspawn.wcb'};
        my $async = delete $self->{async};
        if (scalar(@$r) == 3) { # error
@@ -257,6 +258,7 @@ sub psgi_return_init_cb {
        } elsif ($async) {
                # done reading headers, handoff to read body
                my $fh = $wcb->($r); # scalar @$r == 2
+               $fh = $filter->attach($fh) if $filter;
                $self->{fh} = $fh;
                $async->async_pass($env->{'psgix.io'}, $fh,
                                        delete($self->{hdr_buf}));
@@ -264,7 +266,7 @@ sub psgi_return_init_cb {
                require PublicInbox::GetlineBody;
                $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
                                        \&event_step, $self,
-                                       ${$self->{hdr_buf}});
+                                       ${$self->{hdr_buf}}, $filter);
                $wcb->($r);
        }
 
@@ -294,6 +296,10 @@ sub psgi_return_start { # may run later, much later...
 #                          psgi_return will return an anonymous
 #                          sub for the PSGI server to call
 #
+#   $env->{'qspawn.filter'} - filter object, responds to ->attach for
+#                             pi-httpd.async and ->translate for generic
+#                             PSGI servers
+#
 # $limiter - the Limiter object to use (uses the def_limiter if not given)
 #
 # $parse_hdr - Initial read function; often for parsing CGI header output.
diff --git a/t/gzip_filter.t b/t/gzip_filter.t
new file mode 100644 (file)
index 0000000..400214e
--- /dev/null
@@ -0,0 +1,37 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use Test::More;
+use IO::Handle (); # autoflush
+use Fcntl qw(SEEK_SET);
+use PublicInbox::TestCommon;
+require_mods(qw(Compress::Zlib IO::Uncompress::Gunzip));
+require_ok 'PublicInbox::GzipFilter';
+
+{
+       open my $fh, '+>', undef or die "open: $!";
+       open my $dup, '>&', $fh or die "dup $!";
+       $dup->autoflush(1);
+       my $filter = PublicInbox::GzipFilter->new->attach($dup);
+       ok($filter->write("hello"), 'wrote something');
+       ok($filter->write("world"), 'wrote more');
+       $filter->close;
+       seek($fh, 0, SEEK_SET) or die;
+       IO::Uncompress::Gunzip::gunzip($fh => \(my $buf));
+       is($buf, 'helloworld', 'buffer matches');
+}
+
+{
+       pipe(my ($r, $w)) or die "pipe: $!";
+       $w->autoflush(1);
+       close $r or die;
+       my $filter = PublicInbox::GzipFilter->new->attach($w);
+       my $sigpipe;
+       local $SIG{PIPE} = sub { $sigpipe = 1 };
+       open my $fh, '<', 'COPYING' or die "open(COPYING): $!";
+       my $buf = do { local $/; <$fh> };
+       while ($filter->write($buf .= rand)) {}
+       ok($sigpipe, 'got SIGPIPE');
+       close $w;
+}
+done_testing;
index 35d1216ec761e92134647ae50dec3a8bd8565f2a..f2427234c05afa8803868c25168af0c01a0d7a90 100644 (file)
@@ -85,6 +85,15 @@ my $app = sub {
                        close $null;
                        [ 200, [ qw(Content-Type application/octet-stream) ]];
                });
+       } elsif ($path eq '/psgi-return-gzip') {
+               require PublicInbox::Qspawn;
+               require PublicInbox::GzipFilter;
+               my $cmd = [qw(echo hello world)];
+               my $qsp = PublicInbox::Qspawn->new($cmd);
+               $env->{'qspawn.filter'} = PublicInbox::GzipFilter->new;
+               return $qsp->psgi_return($env, undef, sub {
+                       [ 200, [ qw(Content-Type application/octet-stream)]]
+               });
        } elsif ($path eq '/pid') {
                $code = 200;
                push @$body, "$$\n";
index c99e5ec76157d0611ad82344425ed17db50a6905..e50aa43682babfd66d006237eababf3c3863b054 100644 (file)
@@ -22,6 +22,7 @@ my $err = "$tmpdir/stderr.log";
 my $out = "$tmpdir/stdout.log";
 my $psgi = "./t/httpd-corner.psgi";
 my $sock = tcp_server() or die;
+my @zmods = qw(PublicInbox::GzipFilter IO::Uncompress::Gunzip);
 
 # make sure stdin is not a pipe for lsof test to check for leaking pipes
 open(STDIN, '<', '/dev/null') or die 'no /dev/null: $!';
@@ -324,6 +325,14 @@ SKIP: {
        close $fh or die "curl errored out \$?=$?";
        is($n, 30 * 1024 * 1024, 'got expected output from curl');
        is($non_zero, 0, 'read all zeros');
+
+       require_mods(@zmods, 1);
+       open $fh, '-|', qw(curl -sS), "$base/psgi-return-gzip" or die;
+       binmode $fh;
+       my $buf = do { local $/; <$fh> };
+       close $fh or die "curl errored out \$?=$?";
+       IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out));
+       is($out, "hello world\n");
 }
 
 {
@@ -596,6 +605,22 @@ SKIP: {
        is_deeply([], [keys %child], 'no extra pipes with -W0');
 };
 
+# ensure compatibility with other PSGI servers
+SKIP: {
+       require_mods(@zmods, qw(Plack::Test HTTP::Request::Common), 3);
+       use_ok 'HTTP::Request::Common';
+       use_ok 'Plack::Test';
+       my $app = require $psgi;
+       test_psgi($app, sub {
+               my ($cb) = @_;
+               my $req = GET('http://example.com/psgi-return-gzip');
+               my $res = $cb->($req);
+               my $buf = $res->content;
+               IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out));
+               is($out, "hello world\n");
+       });
+}
+
 done_testing();
 
 sub capture {