From 1dacadf3328241e7c4a378c4566c497d1e33326b Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 10 Jun 2020 07:04:23 +0000 Subject: [PATCH] git: idle rbuf for async We do this for the C10K-oriented HTTP/NNTP/IMAP processes, and we may support thousands of git-cat-file processes in the future. --- lib/PublicInbox/Git.pm | 47 ++++++++++++++++++---------------- lib/PublicInbox/GitAsyncCat.pm | 2 +- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index c5a3fa46..1c148086 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -123,7 +123,6 @@ sub _bidi_pipe { fcntl($out_w, 1031, 4096); fcntl($in_r, 1031, 4096) if $batch eq '--batch-check'; } - $self->{$batch} = \(my $rbuf = ''); $self->{$out} = $out_w; $self->{$in} = $in_r; } @@ -160,18 +159,20 @@ sub cat_async_step ($$) { my ($self, $inflight) = @_; die 'BUG: inflight empty or odd' if scalar(@$inflight) < 2; my ($cb, $arg) = splice(@$inflight, 0, 2); - my $head = my_readline($self->{in}, $self->{'--batch'}); - $head =~ / missing$/ and return - eval { $cb->(undef, undef, undef, undef, $arg) }; - - $head =~ /^([0-9a-f]{40}) (\S+) ([0-9]+)$/ or + my $rbuf = delete($self->{cat_rbuf}) // \(my $new = ''); + my ($bref, $oid, $type, $size); + my $head = my_readline($self->{in}, $rbuf); + if ($head =~ /^([0-9a-f]{40}) (\S+) ([0-9]+)$/) { + ($oid, $type, $size) = ($1, $2, $3 + 0); + $bref = my_read($self->{in}, $rbuf, $size + 1) or + fail($self, defined($bref) ? 'read EOF' : "read: $!"); + chop($$bref) eq "\n" or fail($self, 'LF missing after blob'); + } elsif ($head !~ / missing$/) { fail($self, "Unexpected result from async git cat-file: $head"); - my ($oid_hex, $type, $size) = ($1, $2, $3 + 0); - my $ret = my_read($self->{in}, $self->{'--batch'}, $size + 1); - fail($self, defined($ret) ? 'read EOF' : "read: $!") if !$ret; - chop($$ret) eq "\n" or fail($self, 'newline missing after blob'); - eval { $cb->($ret, $oid_hex, $type, $size, $arg) }; - warn "E: $oid_hex $@\n" if $@; + } + eval { $cb->($bref, $oid, $type, $size, $arg) }; + $self->{cat_rbuf} = $rbuf if $$rbuf ne ''; + warn "E: $oid: $@\n" if $@; } sub cat_async_wait ($) { @@ -188,12 +189,13 @@ sub batch_prepare ($) { sub cat_file { my ($self, $obj, $sizeref) = @_; - my ($retried, $head); + my ($retried, $head, $rbuf); cat_async_wait($self); again: batch_prepare($self); + $rbuf = delete($self->{cat_rbuf}) // \(my $new = ''); print { $self->{out} } $obj, "\n" or fail($self, "write error: $!"); - $head = my_readline($self->{in}, $self->{'--batch'}); + $head = my_readline($self->{in}, $rbuf); if ($head =~ / missing$/) { if (!$retried && alternates_changed($self)) { $retried = 1; @@ -207,7 +209,8 @@ again: my $size = $1 + 0; $$sizeref = $size if $sizeref; - my $ret = my_read($self->{in}, $self->{'--batch'}, $size + 1); + my $ret = my_read($self->{in}, $rbuf, $size + 1); + $self->{cat_rbuf} = $rbuf if $$rbuf ne ''; fail($self, defined($ret) ? 'read EOF' : "read: $!") if !$ret; chop($$ret) eq "\n" or fail($self, 'newline missing after blob'); $ret; @@ -217,7 +220,8 @@ sub check { my ($self, $obj) = @_; _bidi_pipe($self, qw(--batch-check in_c out_c pid_c err_c)); print { $self->{out_c} } $obj, "\n" or fail($self, "write error: $!"); - chomp(my $line = my_readline($self->{in_c}, $self->{'--batch-check'})); + my $rbuf = ''; # TODO: async + {chk_rbuf} + chomp(my $line = my_readline($self->{in_c}, \$rbuf)); my ($hex, $type, $size) = split(' ', $line); # Future versions of git.git may show 'ambiguous', but for now, @@ -227,8 +231,7 @@ sub check { return if $type eq 'missing' || $type eq 'ambiguous'; if ($hex eq 'dangling' || $hex eq 'notdir' || $hex eq 'loop') { - my $ret = my_read($self->{in_c}, $self->{'--batch-check'}, - $type + 1); + my $ret = my_read($self->{in_c}, \$rbuf, $type + 1); fail($self, defined($ret) ? 'read EOF' : "read: $!") if !$ret; return; } @@ -237,9 +240,9 @@ sub check { } sub _destroy { - my ($self, $batch, $in, $out, $pid, $err) = @_; + my ($self, $rbuf, $in, $out, $pid, $err) = @_; my $p = delete $self->{$pid} or return; - delete @$self{($batch, $in, $out)}; + delete @$self{($rbuf, $in, $out)}; delete $self->{$err} if $err; # `err_c' # PublicInbox::DS may not be loaded @@ -278,8 +281,8 @@ sub qx { sub cleanup { my ($self) = @_; cat_async_wait($self); - _destroy($self, qw(--batch in out pid)); - _destroy($self, qw(--batch-check in_c out_c pid_c err_c)); + _destroy($self, qw(cat_rbuf in out pid)); + _destroy($self, qw(chk_rbuf in_c out_c pid_c err_c)); !!($self->{pid} || $self->{pid_c}); } diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm index f168169f..65e16121 100644 --- a/lib/PublicInbox/GitAsyncCat.pm +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -30,7 +30,7 @@ sub event_step { my $inflight = $git->{inflight}; if (@$inflight) { $git->cat_async_step($inflight); - $self->requeue if @$inflight || length(${$git->{'--batch'}}); + $self->requeue if @$inflight || exists $git->{cat_rbuf}; } } -- 2.44.0