]> Sergey Matveev's repositories - public-inbox.git/commitdiff
mboxgz: do asynchronous git blob retrievals
authorEric Wong <e@yhbt.net>
Sun, 5 Jul 2020 23:27:30 +0000 (23:27 +0000)
committerEric Wong <e@yhbt.net>
Mon, 6 Jul 2020 20:01:15 +0000 (20:01 +0000)
This lets the -httpd worker process make better use of time
instead of waiting for git-cat-file to respond.  With 4 jobs in
the new test case against a clone of
<https://public-inbox.org/meta/>, a speedup of 10-12% is shown.
Even a single job shows a 2-5% improvement on an SSD.

MANIFEST
lib/PublicInbox/HTTP.pm
lib/PublicInbox/MboxGz.pm
xt/httpd-async-stream.t [new file with mode: 0644]

index dcd7a7e5f7b1b2645212dd3e6719b05b7fc8d347..9b0f50203fedeb79d984fb071f2fc8804e9d03f9 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -368,6 +368,7 @@ xt/cmp-msgview.t
 xt/eml_check_limits.t
 xt/git-http-backend.t
 xt/git_async_cmp.t
+xt/httpd-async-stream.t
 xt/imapd-mbsync-oimap.t
 xt/imapd-validate.t
 xt/mem-imapd-tls.t
index 8281746538e2df420a7003a8fc62cdd2dab159a9..5844ef440f1e8dd324ce6b67401de7c99e696c8e 100644 (file)
@@ -488,6 +488,13 @@ sub busy () {
        ($self->{rbuf} || exists($self->{env}) || $self->{wbuf});
 }
 
+# runs $cb on the next iteration of the event loop at earliest
+sub next_step {
+       my ($self, $cb) = @_;
+       return unless exists $self->{sock};
+       $self->requeue if 1 == push(@{$self->{wbuf}}, $cb);
+}
+
 # Chunked and Identity packages are used for writing responses.
 # They may be exposed to the PSGI application when the PSGI app
 # returns a CODE ref for "push"-based responses
index 535ef96c9eb448237a7170813a01648d61212f33..8c9010afb549c5ceebe7f4bba78a13a5d4e13e03 100644 (file)
@@ -6,6 +6,9 @@ use parent 'PublicInbox::GzipFilter';
 use PublicInbox::Eml;
 use PublicInbox::Hval qw/to_filename/;
 use PublicInbox::Mbox;
+use PublicInbox::GitAsyncCat;
+*msg_hdr = \&PublicInbox::Mbox::msg_hdr;
+*msg_body = \&PublicInbox::Mbox::msg_body;
 
 sub new {
        my ($class, $ctx, $cb) = @_;
@@ -17,33 +20,81 @@ sub new {
        }, $class;
 }
 
+# this is public-inbox-httpd-specific
+sub mboxgz_blob_cb { # git->cat_async callback
+       my ($bref, $oid, $type, $size, $self) = @_;
+       my $http = $self->{ctx}->{env}->{'psgix.io'} or return; # client abort
+       my $smsg = delete $self->{smsg} or die 'BUG: no smsg';
+       if (!defined($oid)) {
+               # it's possible to have TOCTOU if an admin runs
+               # public-inbox-(edit|purge), just move onto the next message
+               return $http->next_step(\&async_next);
+       } else {
+               $smsg->{blob} eq $oid or die "BUG: $smsg->{blob} != $oid";
+       }
+       $self->zmore(msg_hdr($self->{ctx},
+                               PublicInbox::Eml->new($bref)->header_obj,
+                               $smsg->{mid}));
+
+       # PublicInbox::HTTP::{Chunked,Identity}::write
+       $self->{http_out}->write($self->translate(msg_body($$bref)));
+
+       $http->next_step(\&async_next);
+}
+
+# this is public-inbox-httpd-specific
+sub async_step ($) {
+       my ($self) = @_;
+       if (my $smsg = $self->{smsg} = $self->{cb}->($self->{ctx})) {
+               git_async_cat($self->{ctx}->{-inbox}->git, $smsg->{blob},
+                               \&mboxgz_blob_cb, $self);
+       } elsif (my $out = delete $self->{http_out}) {
+               $out->write($self->zflush);
+               $out->close;
+       }
+}
+
+# called by PublicInbox::DS::write
+sub async_next {
+       my ($http) = @_; # PublicInbox::HTTP
+       async_step($http->{forward});
+}
+
+# called by PublicInbox::HTTP::close, or any other PSGI server
+sub close { !!delete($_[0]->{http_out}) }
+
 sub response {
        my ($class, $ctx, $cb, $fn) = @_;
-       my $body = $class->new($ctx, $cb);
+       my $self = $class->new($ctx, $cb);
        # http://www.iana.org/assignments/media-types/application/gzip
        $fn = defined($fn) && $fn ne '' ? to_filename($fn) : 'no-subject';
        my $h = [ qw(Content-Type application/gzip),
                'Content-Disposition', "inline; filename=$fn.mbox.gz" ];
-       [ 200, $h, $body ];
+       if ($ctx->{env}->{'pi-httpd.async'}) {
+               sub {
+                       my ($wcb) = @_; # -httpd provided write callback
+                       $self->{http_out} = $wcb->([200, $h]);
+                       $self->{ctx}->{env}->{'psgix.io'}->{forward} = $self;
+                       async_step($self); # start stepping
+               };
+       } else { # generic PSGI
+               [ 200, $h, $self ];
+       }
 }
 
-# called by Plack::Util::foreach or similar
+# called by Plack::Util::foreach or similar (generic PSGI)
 sub getline {
        my ($self) = @_;
        my $ctx = $self->{ctx} or return;
        while (my $smsg = $self->{cb}->($ctx)) {
                my $mref = $ctx->{-inbox}->msg_by_smsg($smsg) or next;
                my $h = PublicInbox::Eml->new($mref)->header_obj;
-               $self->zmore(
-                       PublicInbox::Mbox::msg_hdr($ctx, $h, $smsg->{mid})
-               );
-               return $self->translate(PublicInbox::Mbox::msg_body($$mref));
+               $self->zmore(msg_hdr($ctx, $h, $smsg->{mid}));
+               return $self->translate(msg_body($$mref));
        }
        # signal that we're done and can return undef next call:
        delete $self->{ctx};
        $self->zflush;
 }
 
-sub close {} # noop
-
 1;
diff --git a/xt/httpd-async-stream.t b/xt/httpd-async-stream.t
new file mode 100644 (file)
index 0000000..29bcb61
--- /dev/null
@@ -0,0 +1,99 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+# Expensive test to validate compression and TLS.
+use strict;
+use Test::More;
+use PublicInbox::TestCommon;
+use PublicInbox::DS qw(now);
+use PublicInbox::Spawn qw(which popen_rd);
+use Digest::MD5;
+use POSIX qw(_exit);
+my $inboxdir = $ENV{GIANT_INBOX_DIR};
+plan skip_all => "GIANT_INBOX_DIR not defined for $0" unless $inboxdir;
+my $curl = which('curl') or plan skip_all => "curl(1) missing for $0";
+my ($tmpdir, $for_destroy) = tmpdir();
+require_mods(qw(DBD::SQLite));
+my $JOBS = $ENV{TEST_JOBS} // 4;
+diag "TEST_JOBS=$JOBS";
+
+my $make_local_server = sub {
+       my $pi_config = "$tmpdir/config";
+       open my $fh, '>', $pi_config or die "open($pi_config): $!";
+       print $fh <<"" or die "print $pi_config: $!";
+[publicinbox "test"]
+inboxdir = $inboxdir
+address = test\@example.com
+
+       close $fh or die "close($pi_config): $!";
+       my ($out, $err) = ("$tmpdir/out", "$tmpdir/err");
+       for ($out, $err) {
+               open my $fh, '>', $_ or die "truncate: $!";
+       }
+       my $http = tcp_server();
+       my $rdr = { 3 => $http };
+
+       # not using multiple workers, here, since we want to increase
+       # the chance of tripping concurrency bugs within PublicInbox/HTTP*.pm
+       my $cmd = [ '-httpd', "--stdout=$out", "--stderr=$err", '-W0' ];
+       my $host_port = $http->sockhost.':'.$http->sockport;
+       push @$cmd, "-lhttp://$host_port";
+       my $url = "$host_port/test/all.mbox.gz";
+       print STDERR "# CMD ". join(' ', @$cmd). "\n";
+       my $env = { PI_CONFIG => $pi_config };
+       (start_script($cmd, $env, $rdr), $url);
+};
+
+my ($td, $url) = $make_local_server->();
+
+my $do_get_all = sub {
+       my ($job) = @_;
+       local $SIG{__DIE__} = sub { print STDERR $job, ': ', @_; _exit(1) };
+       my $dig = Digest::MD5->new;
+       my ($buf, $nr);
+       my $bytes = 0;
+       my $t0 = now();
+       my ($rd, $pid) = popen_rd([$curl, qw(-HHost:example.com -sSf), $url]);
+       while (1) {
+               $nr = sysread($rd, $buf, 65536);
+               last if !$nr;
+               $dig->add($buf);
+               $bytes += $nr;
+       }
+       my $res = $dig->hexdigest;
+       my $elapsed = sprintf('%0.3f', now() - $t0);
+       close $rd or die "close curl failed: $!\n";
+       waitpid($pid, 0) == $pid or die "waitpid failed: $!\n";
+       $? == 0 or die "curl failed: $?\n";
+       print STDERR "# $job $$ ($?) $res (${elapsed}s) $bytes bytes\n";
+       $res;
+};
+
+my (%pids, %res);
+for my $job (1..$JOBS) {
+       pipe(my ($r, $w)) or die;
+       my $pid = fork;
+       if ($pid == 0) {
+               close $r or die;
+               my $res = $do_get_all->($job);
+               print $w $res or die;
+               close $w or die;
+               _exit(0);
+       }
+       close $w or die;
+       $pids{$pid} = [ $job, $r ];
+}
+
+while (scalar keys %pids) {
+       my $pid = waitpid(-1, 0) or next;
+       my $child = delete $pids{$pid} or next;
+       my ($job, $rpipe) = @$child;
+       is($?, 0, "$job done");
+       my $sum = do { local $/; <$rpipe> };
+       push @{$res{$sum}}, $job;
+}
+is(scalar keys %res, 1, 'all got the same result');
+$td->kill;
+$td->join;
+is($?, 0, 'no error on -httpd exit');
+done_testing;