# Copyright (C) 2020 all contributors
# License: AGPL-3.0+
#
# internal class used by PublicInbox::Git + PublicInbox::DS
# This parses the output pipe of "git cat-file --batch"
#
# Note: this does NOT set the non-blocking flag, we expect `git cat-file'
# to be a local process, and git won't start writing a blob until it's
# fully read. So minimize context switching and read as much as possible
# and avoid holding a buffer in our heap any longer than it has to live.
package PublicInbox::GitAsyncCat;
use strict;
use parent qw(PublicInbox::DS Exporter);
use POSIX qw(WNOHANG);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
our @EXPORT = qw(git_async_cat git_async_prefetch);
use PublicInbox::Git ();
our $GCF2C; # singleton PublicInbox::Gcf2Client
sub close {
my ($self) = @_;
if (my $gitish = delete $self->{gitish}) {
PublicInbox::Git::cat_async_abort($gitish);
}
$self->SUPER::close; # PublicInbox::DS::close
}
sub event_step {
my ($self) = @_;
my $gitish = $self->{gitish} or return;
return $self->close if ($gitish->{in} // 0) != ($self->{sock} // 1);
my $inflight = $gitish->{inflight};
if ($inflight && @$inflight) {
$gitish->cat_async_step($inflight);
# child death?
if (($gitish->{in} // 0) != ($self->{sock} // 1)) {
$self->close;
} elsif (@$inflight || exists $gitish->{cat_rbuf}) {
# ok, more to do, requeue for fairness
$self->requeue;
}
} elsif ((my $pid = waitpid($gitish->{pid}, WNOHANG)) > 0) {
# May happen if the child process is killed by a BOFH
# (or segfaults)
delete $gitish->{pid};
warn "E: gitish $pid exited with \$?=$?\n";
$self->close;
}
}
sub git_async_cat ($$$$) {
my ($git, $oid, $cb, $arg) = @_;
my $gitish = $GCF2C //= eval {
require PublicInbox::Gcf2;
require PublicInbox::Gcf2Client;
PublicInbox::Gcf2Client::new();
} // 0; # 0: do not retry if libgit2 or Inline::C are missing
if ($gitish) { # Gcf2 active, {inflight} may be unset due to errors
$GCF2C->{inflight} or
$gitish = $GCF2C = PublicInbox::Gcf2Client::new();
$oid .= " $git->{git_dir}";
} else {
$gitish = $git;
}
$gitish->cat_async($oid, $cb, $arg);
$gitish->{async_cat} //= do {
# read-only end of pipe (Gcf2Client is write-only end)
my $self = bless { gitish => $gitish }, __PACKAGE__;
$self->SUPER::new($gitish->{in}, EPOLLIN|EPOLLET);
\undef; # this is a true ref()
};
}
# this is safe to call inside $cb, but not guaranteed to enqueue
# returns true if successful, undef if not.
sub git_async_prefetch {
my ($git, $oid, $cb, $arg) = @_;
if ($GCF2C) {
if ($GCF2C->{async_cat} && !$GCF2C->{wbuf}) {
$oid .= " $git->{git_dir}";
return $GCF2C->cat_async($oid, $cb, $arg);
}
} elsif ($git->{async_cat} && (my $inflight = $git->{inflight})) {
# we could use MAX_INFLIGHT here w/o the halving,
# but lets not allow one client to monopolize a git process
if (@$inflight < int(PublicInbox::Git::MAX_INFLIGHT/2)) {
print { $git->{out} } $oid, "\n" or
$git->fail("write error: $!");
return push(@$inflight, $oid, $cb, $arg);
}
}
undef;
}
1;