X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FGitAsyncCat.pm;h=2e0725a61b2248d0124c3a75e38befd97c76f55e;hb=f35d722d38e571458fc413b9f0d7ddd788ec4b98;hp=098101aed00843633eb239443a1a9c61f3cdf937;hpb=10ee3548084c125f20fe2c830faea2a43413be92;p=public-inbox.git diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm index 098101ae..2e0725a6 100644 --- a/lib/PublicInbox/GitAsyncCat.pm +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -1,51 +1,141 @@ -# Copyright (C) 2020 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # -# internal class used by PublicInbox::Git + Danga::Socket +# internal class used by PublicInbox::Git + PublicInbox::DS # This parses the output pipe of "git cat-file --batch" -# -# Note: this does NOT set the non-blocking flag, we expect `git cat-file' -# to be a local process, and git won't start writing a blob until it's -# fully read. So minimize context switching and read as much as possible -# and avoid holding a buffer in our heap any longer than it has to live. package PublicInbox::GitAsyncCat; -use strict; +use v5.12; use parent qw(PublicInbox::DS Exporter); -use fields qw(git); +use POSIX qw(WNOHANG); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); -our @EXPORT = qw(git_async_cat); - -sub _add { - my ($class, $git) = @_; - my $self = fields::new($class); - $git->batch_prepare; - $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET); - $self->{git} = $git; - \undef; # this is a true ref() +our @EXPORT = qw(ibx_async_cat ibx_async_prefetch async_check); +use PublicInbox::Git (); + +our $GCF2C; # singleton PublicInbox::Gcf2Client + +sub close { + my ($self) = @_; + if (my $git = delete $self->{git}) { + $git->async_abort; + } + $self->SUPER::close; # PublicInbox::DS::close } sub event_step { my ($self) = @_; - my $git = $self->{git} or return; # ->close-ed + my $git = $self->{git} or return; + return $self->close if ($git->{in} // 0) != ($self->{sock} // 1); my $inflight = $git->{inflight}; if ($inflight && @$inflight) { $git->cat_async_step($inflight); - $self->requeue if @$inflight || exists $git->{cat_rbuf}; + + # child death? + if (($git->{in} // 0) != ($self->{sock} // 1)) { + $self->close; + } elsif (@$inflight || exists $git->{rbuf}) { + # ok, more to do, requeue for fairness + $self->requeue; + } + } elsif ((my $pid = waitpid($git->{pid}, WNOHANG)) > 0) { + # May happen if the child process is killed by a BOFH + # (or segfaults) + delete $git->{pid}; + warn "E: git $pid exited with \$?=$?\n"; + $self->close; } } -sub close { - my ($self) = @_; - if (my $git = delete $self->{git}) { - delete $git->{async_cat}; +sub watch_cat { + my ($git) = @_; + $git->{async_cat} //= do { + my $self = bless { git => $git }, __PACKAGE__; + $git->{in}->blocking(0); + $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET); + \undef; # this is a true ref() + }; +} + +sub ibx_async_cat ($$$$) { + my ($ibx, $oid, $cb, $arg) = @_; + my $git = $ibx->{git} // $ibx->git; + # {topdir} means ExtSearch (likely [extindex "all"]) with potentially + # 100K alternates. git(1) has a proposed patch for 100K alternates: + # + if (!defined($ibx->{topdir}) && !defined($git->{-tmp}) && + ($GCF2C //= eval { + require PublicInbox::Gcf2Client; + PublicInbox::Gcf2Client::new(); + } // 0)) { # 0: do not retry if libgit2 or Inline::C are missing + $GCF2C->gcf2_async(\"$oid $git->{git_dir}\n", $cb, $arg); + \undef; + } else { # read-only end of git-cat-file pipe + $git->cat_async($oid, $cb, $arg); + watch_cat($git); } - $self->SUPER::close; # PublicInbox::DS::close } -sub git_async_cat ($$$$) { - my ($git, $oid, $cb, $arg) = @_; - $git->cat_async($oid, $cb, $arg); - $git->{async_cat} //= _add(__PACKAGE__, $git); +sub async_check ($$$$) { + my ($ibx, $oidish, $cb, $arg) = @_; + my $git = $ibx->{git} // $ibx->git; + $git->check_async($oidish, $cb, $arg); + $git->{async_chk} //= do { + my $self = bless { git => $git }, 'PublicInbox::GitAsyncCheck'; + $git->{in_c}->blocking(0); + $self->SUPER::new($git->{in_c}, EPOLLIN|EPOLLET); + \undef; # this is a true ref() + }; +} + +# this is safe to call inside $cb, but not guaranteed to enqueue +# returns true if successful, undef if not. For fairness, we only +# prefetch if there's no in-flight requests. +sub ibx_async_prefetch { + my ($ibx, $oid, $cb, $arg) = @_; + my $git = $ibx->git; + if (!defined($ibx->{topdir}) && $GCF2C) { + if (!@{$GCF2C->{inflight} // []}) { + $oid .= " $git->{git_dir}\n"; + return $GCF2C->gcf2_async(\$oid, $cb, $arg); # true + } + } elsif ($git->{async_cat} && (my $inflight = $git->{inflight})) { + if (!@$inflight) { + print { $git->{out} } $oid, "\n" or + $git->fail("write error: $!"); + return push(@$inflight, $oid, $cb, $arg); + } + } + undef; +} + +1; +package PublicInbox::GitAsyncCheck; +use v5.12; +our @ISA = qw(PublicInbox::GitAsyncCat); +use POSIX qw(WNOHANG); +use PublicInbox::Syscall qw(EPOLLIN EPOLLET); + +sub event_step { + my ($self) = @_; + my $git = $self->{git} or return; + return $self->close if ($git->{in_c} // 0) != ($self->{sock} // 1); + my $inflight = $git->{inflight_c}; + if ($inflight && @$inflight) { + $git->check_async_step($inflight); + + # child death? + if (($git->{in_c} // 0) != ($self->{sock} // 1)) { + $self->close; + } elsif (@$inflight || exists $git->{rbuf_c}) { + # ok, more to do, requeue for fairness + $self->requeue; + } + } elsif ((my $pid = waitpid($git->{pid_c}, WNOHANG)) > 0) { + # May happen if the child process is killed by a BOFH + # (or segfaults) + delete $git->{pid_c}; + warn "E: git $pid exited with \$?=$?\n"; + $self->close; + } } 1;