]> Sergey Matveev's repositories - public-inbox.git/commitdiff
qspawn|getlinebody: support streaming filters
authorEric Wong <e@80x24.org>
Wed, 11 Jan 2017 04:12:26 +0000 (04:12 +0000)
committerEric Wong <e@80x24.org>
Tue, 22 Jan 2019 03:38:39 +0000 (03:38 +0000)
This is intended for wrapping "git show" and "git diff"
processes in the future and to prevent it from monopolizing
callers.

This will us to better handle backpressure from gigantic
commits.

lib/PublicInbox/GetlineBody.pm
lib/PublicInbox/Qspawn.pm

index ea07f3d6dd0de1d031012c62372754dbc91acd49..0a922fd271ccd53426fe3caa18c11be105f3d8cf 100644 (file)
@@ -13,8 +13,13 @@ use strict;
 use warnings;
 
 sub new {
-       my ($class, $rpipe, $end, $buf) = @_;
-       bless { rpipe => $rpipe, end => $end, buf => $buf }, $class;
+       my ($class, $rpipe, $end, $buf, $filter) = @_;
+       bless {
+               rpipe => $rpipe,
+               end => $end,
+               buf => $buf,
+               filter => $filter || 0,
+       }, $class;
 }
 
 # close should always be called after getline returns undef,
@@ -24,8 +29,13 @@ sub DESTROY { $_[0]->close }
 
 sub getline {
        my ($self) = @_;
+       my $filter = $self->{filter};
+       return if $filter == -1; # last call was EOF
+
        my $buf = delete $self->{buf}; # initial buffer
-       defined $buf ? $buf : $self->{rpipe}->getline;
+       $buf = $self->{rpipe}->getline unless defined $buf;
+       $self->{filter} = -1 unless defined $buf; # set EOF for next call
+       $filter ? $filter->($buf) : $buf;
 }
 
 sub close {
index b80dac1f3ca5bf829bfc3c04613a89155dc0c809..3247cd078cba71aaaa058798cff4840309ca7932 100644 (file)
@@ -9,6 +9,7 @@ package PublicInbox::Qspawn;
 use strict;
 use warnings;
 use PublicInbox::Spawn qw(popen_rd);
+require Plack::Util;
 my $def_limiter;
 
 sub new ($$$;) {
@@ -60,11 +61,25 @@ sub start {
        }
 }
 
+# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
+sub filter_fh ($$) {
+       my ($fh, $filter) = @_;
+       Plack::Util::inline_object(
+               close => sub {
+                       $fh->write($filter->(undef));
+                       $fh->close;
+               },
+               write => sub {
+                       $fh->write($filter->($_[0]));
+               });
+}
+
 sub psgi_return {
        my ($self, $env, $limiter, $parse_hdr) = @_;
        my ($fh, $rpipe);
        my $end = sub {
-               if (my $err = $self->finish) {
+               my $err = $self->finish;
+               if ($err && !$env->{'qspawn.quiet'}) {
                        $err = join(' ', @{$self->{args}->[0]}).": $err\n";
                        $env->{'psgi.errors'}->print($err);
                }
@@ -84,6 +99,7 @@ sub psgi_return {
        my $cb = sub {
                my $r = $rd_hdr->() or return;
                $rd_hdr = undef;
+               my $filter = delete $env->{'qspawn.filter'};
                if (scalar(@$r) == 3) { # error
                        if ($async) {
                                $async->close; # calls rpipe->close
@@ -94,11 +110,12 @@ sub psgi_return {
                        $res->($r);
                } elsif ($async) {
                        $fh = $res->($r); # scalar @$r == 2
+                       $fh = filter_fh($fh, $filter) if $filter;
                        $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
                } else { # for synchronous PSGI servers
                        require PublicInbox::GetlineBody;
                        $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
-                                                               $buf);
+                                                               $buf, $filter);
                        $res->($r);
                }
        };