1 # Copyright (C) all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
4 # internal class used by PublicInbox::Git + PublicInbox::DS
5 # This parses the output pipe of "git cat-file --batch"
6 package PublicInbox::GitAsyncCat;
8 use parent qw(PublicInbox::DS Exporter);
10 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
11 our @EXPORT = qw(ibx_async_cat ibx_async_prefetch async_check);
12 use PublicInbox::Git ();
14 our $GCF2C; # singleton PublicInbox::Gcf2Client
18 if (my $git = delete $self->{git}) {
21 $self->SUPER::close; # PublicInbox::DS::close
26 my $git = $self->{git} or return;
27 return $self->close if ($git->{in} // 0) != ($self->{sock} // 1);
28 my $inflight = $git->{inflight};
29 if ($inflight && @$inflight) {
30 $git->cat_async_step($inflight);
33 if (($git->{in} // 0) != ($self->{sock} // 1)) {
35 } elsif (@$inflight || exists $git->{rbuf}) {
36 # ok, more to do, requeue for fairness
39 } elsif ((my $pid = waitpid($git->{pid}, WNOHANG)) > 0) {
40 # May happen if the child process is killed by a BOFH
43 warn "E: git $pid exited with \$?=$?\n";
50 $git->{async_cat} //= do {
51 my $self = bless { git => $git }, __PACKAGE__;
52 $git->{in}->blocking(0);
53 $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET);
54 \undef; # this is a true ref()
58 sub ibx_async_cat ($$$$) {
59 my ($ibx, $oid, $cb, $arg) = @_;
60 my $git = $ibx->{git} // $ibx->git;
61 # {topdir} means ExtSearch (likely [extindex "all"]) with potentially
62 # 100K alternates. git(1) has a proposed patch for 100K alternates:
63 # <https://lore.kernel.org/git/20210624005806.12079-1-e@80x24.org/>
64 if (!defined($ibx->{topdir}) && !defined($git->{-tmp}) &&
66 require PublicInbox::Gcf2Client;
67 PublicInbox::Gcf2Client::new();
68 } // 0)) { # 0: do not retry if libgit2 or Inline::C are missing
69 $GCF2C->gcf2_async(\"$oid $git->{git_dir}\n", $cb, $arg);
71 } else { # read-only end of git-cat-file pipe
72 $git->cat_async($oid, $cb, $arg);
77 sub async_check ($$$$) {
78 my ($ibx, $oidish, $cb, $arg) = @_;
79 my $git = $ibx->{git} // $ibx->git;
80 $git->check_async($oidish, $cb, $arg);
81 $git->{async_chk} //= do {
82 my $self = bless { git => $git }, 'PublicInbox::GitAsyncCheck';
83 $git->{in_c}->blocking(0);
84 $self->SUPER::new($git->{in_c}, EPOLLIN|EPOLLET);
85 \undef; # this is a true ref()
89 # this is safe to call inside $cb, but not guaranteed to enqueue
90 # returns true if successful, undef if not. For fairness, we only
91 # prefetch if there's no in-flight requests.
92 sub ibx_async_prefetch {
93 my ($ibx, $oid, $cb, $arg) = @_;
95 if (!defined($ibx->{topdir}) && $GCF2C) {
96 if (!@{$GCF2C->{inflight} // []}) {
97 $oid .= " $git->{git_dir}\n";
98 return $GCF2C->gcf2_async(\$oid, $cb, $arg); # true
100 } elsif ($git->{async_cat} && (my $inflight = $git->{inflight})) {
102 print { $git->{out} } $oid, "\n" or
103 $git->fail("write error: $!");
104 return push(@$inflight, $oid, $cb, $arg);
111 package PublicInbox::GitAsyncCheck;
113 our @ISA = qw(PublicInbox::GitAsyncCat);
114 use POSIX qw(WNOHANG);
115 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
119 my $git = $self->{git} or return;
120 return $self->close if ($git->{in_c} // 0) != ($self->{sock} // 1);
121 my $inflight = $git->{inflight_c};
122 if ($inflight && @$inflight) {
123 $git->check_async_step($inflight);
126 if (($git->{in_c} // 0) != ($self->{sock} // 1)) {
128 } elsif (@$inflight || exists $git->{rbuf_c}) {
129 # ok, more to do, requeue for fairness
132 } elsif ((my $pid = waitpid($git->{pid_c}, WNOHANG)) > 0) {
133 # May happen if the child process is killed by a BOFH
135 delete $git->{pid_c};
136 warn "E: git $pid exited with \$?=$?\n";