]> Sergey Matveev's repositories - public-inbox.git/commitdiff
git: fix asynchronous batching for deep pipelines
authorEric Wong <e@80x24.org>
Wed, 4 Jan 2023 03:49:34 +0000 (03:49 +0000)
committerEric Wong <e@80x24.org>
Wed, 4 Jan 2023 19:45:08 +0000 (19:45 +0000)
...By using non-blocking pipe writes.  This avoids problems for
musl (and other libc) where getdelim(3) used by `git cat-file --batch*'
uses a smaller input buffer than glibc or FreeBSD libc.

My key mistake was our check against MAX_INFLIGHT is only useful
for the initial batch of requests.  It is not useful for
subsequent requests since git will drain the pipe at
unpredictable rates due to libc differences.

To fix this problem, I initially tried to drain the read pipe
as long as readable data was pending.  However, reading git
output without giving git more work would also limit parallelism
opportunities since we don't want git to sit idle, either.  This
change ensures we keep both pipes reasonably full to reduce
stalls and maximize parallelism between git and public-inbox.

While the limit set a few weeks ago in commit
56e6e587745c (git: cap MAX_INFLIGHT value to POSIX minimum, 2022-12-21)
remains in place, any higher or lower limit will work.  It may
be worth it to use an even lower limit to improve interactivity
w.r.t. Ctrl-C interrupts.

I've tested the pre-56e6e587745c and even higher values on an
Alpine VM in the GCC Farm <https://cfarm.tetaneutral.net>

Reported-by: Chris Brannon <chris@the-brannons.com>
Link: https://public-inbox.org/meta/87edssl7u0.fsf@the-brannons.com/T/
lib/PublicInbox/Git.pm

index a1af776be4d62b27186ef4e112b6c46ace89e2c1..86b80a4edb877152f566c00a7b804952516e907b 100644 (file)
@@ -145,6 +145,7 @@ sub _bidi_pipe {
                fcntl($out_w, 1031, 4096);
                fcntl($in_r, 1031, 4096) if $batch eq '--batch-check';
        }
+       $out_w->blocking(0);
        $self->{$out} = $out_w;
        $self->{$in} = $in_r;
 }
@@ -203,7 +204,9 @@ sub cat_async_retry ($$) {
        for (my $i = 0; $i < @$inflight; $i += 3) {
                $buf .= "$inflight->[$i]\n";
        }
+       $self->{out}->blocking(1); # brand new pipe, should never block
        print { $self->{out} } $buf or $self->fail("write error: $!");
+       $self->{out}->blocking(0);
        my $req = shift @$inflight;
        unshift(@$inflight, \$req); # \$ref to indicate retried
 
@@ -305,13 +308,27 @@ sub check_async_begin ($) {
        $self->{inflight_c} = [];
 }
 
+sub write_all {
+       my ($self, $out, $buf, $read_step, $inflight) = @_;
+       $read_step->($self, $inflight) while @$inflight >= MAX_INFLIGHT;
+       do {
+               my $w = syswrite($out, $buf);
+               if (defined $w) {
+                       return if $w == length($buf);
+                       warn "chop: $w";
+                       substr($buf, 0, $w, ''); # sv_chop
+               } elsif ($! != EAGAIN) {
+                       $self->fail("write: $!");
+               } else { warn "E: $!" }
+               $read_step->($self, $inflight);
+       } while (1);
+}
+
 sub check_async ($$$$) {
        my ($self, $oid, $cb, $arg) = @_;
        my $inflight_c = $self->{inflight_c} // check_async_begin($self);
-       while (scalar(@$inflight_c) >= MAX_INFLIGHT) {
-               check_async_step($self, $inflight_c);
-       }
-       print { $self->{out_c} } $oid, "\n" or $self->fail("write error: $!");
+       write_all($self, $self->{out_c}, $oid."\n",
+               \&check_async_step, $inflight_c);
        push(@$inflight_c, $oid, $cb, $arg);
 }
 
@@ -496,10 +513,7 @@ sub cat_async_begin {
 sub cat_async ($$$;$) {
        my ($self, $oid, $cb, $arg) = @_;
        my $inflight = $self->{inflight} // cat_async_begin($self);
-       while (scalar(@$inflight) >= MAX_INFLIGHT) {
-               cat_async_step($self, $inflight);
-       }
-       print { $self->{out} } $oid, "\n" or $self->fail("write error: $!");
+       write_all($self, $self->{out}, $oid."\n", \&cat_async_step, $inflight);
        push(@$inflight, $oid, $cb, $arg);
 }