X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FGitAsyncCat.pm;h=dc97af16c09ed3e06ba1b727a5563dd3f32ea251;hb=0d38f65c490466837ae091afa7a7b6f59d04ce7c;hp=e618d366a8bc31dbbcafd29b702160787d267fcb;hpb=2c6e8b10e3f9f3b047009126426ce634c8f29a23;p=public-inbox.git diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm index e618d366..dc97af16 100644 --- a/lib/PublicInbox/GitAsyncCat.pm +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -1,42 +1,94 @@ # Copyright (C) 2020 all contributors # License: AGPL-3.0+ # -# internal class used by PublicInbox::Git + Danga::Socket +# internal class used by PublicInbox::Git + PublicInbox::DS # This parses the output pipe of "git cat-file --batch" -# -# Note: this does NOT set the non-blocking flag, we expect `git cat-file' -# to be a local process, and git won't start writing a blob until it's -# fully read. So minimize context switching and read as much as possible -# and avoid holding a buffer in our heap any longer than it has to live. package PublicInbox::GitAsyncCat; use strict; use parent qw(PublicInbox::DS Exporter); +use POSIX qw(WNOHANG); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); -our @EXPORT = qw(git_async_cat); - -sub _add { - my ($class, $git) = @_; - $git->batch_prepare; - my $self = bless { git => $git }, $class; - $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET); - \undef; # this is a true ref() +our @EXPORT = qw(git_async_cat git_async_prefetch); +use PublicInbox::Git (); + +our $GCF2C; # singleton PublicInbox::Gcf2Client + +sub close { + my ($self) = @_; + + if (my $gitish = delete $self->{gitish}) { + PublicInbox::Git::cat_async_abort($gitish); + } + $self->SUPER::close; # PublicInbox::DS::close } sub event_step { my ($self) = @_; - my $git = $self->{git}; - return $self->close if ($git->{in} // 0) != ($self->{sock} // 1); - my $inflight = $git->{inflight}; + my $gitish = $self->{gitish} or return; + return $self->close if ($gitish->{in} // 0) != ($self->{sock} // 1); + my $inflight = $gitish->{inflight}; if ($inflight && @$inflight) { - $git->cat_async_step($inflight); - $self->requeue if @$inflight || exists $git->{cat_rbuf}; + $gitish->cat_async_step($inflight); + + # child death? + if (($gitish->{in} // 0) != ($self->{sock} // 1)) { + $self->close; + } elsif (@$inflight || exists $gitish->{cat_rbuf}) { + # ok, more to do, requeue for fairness + $self->requeue; + } + } elsif ((my $pid = waitpid($gitish->{pid}, WNOHANG)) > 0) { + # May happen if the child process is killed by a BOFH + # (or segfaults) + delete $gitish->{pid}; + warn "E: gitish $pid exited with \$?=$?\n"; + $self->close; } } sub git_async_cat ($$$$) { my ($git, $oid, $cb, $arg) = @_; - $git->cat_async($oid, $cb, $arg); - $git->{async_cat} //= _add(__PACKAGE__, $git); + my $gitish = $GCF2C //= eval { + require PublicInbox::Gcf2; + require PublicInbox::Gcf2Client; + PublicInbox::Gcf2Client::new(); + } // 0; # 0: do not retry if libgit2 or Inline::C are missing + if ($gitish) { # Gcf2 active, {inflight} may be unset due to errors + $GCF2C->{inflight} or + $gitish = $GCF2C = PublicInbox::Gcf2Client::new(); + $oid .= " $git->{git_dir}"; + } else { + $gitish = $git; + } + $gitish->cat_async($oid, $cb, $arg); + $gitish->{async_cat} //= do { + # read-only end of pipe (Gcf2Client is write-only end) + my $self = bless { gitish => $gitish }, __PACKAGE__; + $gitish->{in}->blocking(0); + $self->SUPER::new($gitish->{in}, EPOLLIN|EPOLLET); + \undef; # this is a true ref() + }; +} + +# this is safe to call inside $cb, but not guaranteed to enqueue +# returns true if successful, undef if not. +sub git_async_prefetch { + my ($git, $oid, $cb, $arg) = @_; + if ($GCF2C) { + if ($GCF2C->{async_cat} && !$GCF2C->{wbuf}) { + $oid .= " $git->{git_dir}"; + return $GCF2C->cat_async($oid, $cb, $arg); + } + } elsif ($git->{async_cat} && (my $inflight = $git->{inflight})) { + # we could use MAX_INFLIGHT here w/o the halving, + # but lets not allow one client to monopolize a git process + if (@$inflight < int(PublicInbox::Git::MAX_INFLIGHT/2)) { + print { $git->{out} } $oid, "\n" or + $git->fail("write error: $!"); + return push(@$inflight, $oid, $cb, $arg); + } + } + undef; } 1;