]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/GitAsyncCat.pm
dc97af16c09ed3e06ba1b727a5563dd3f32ea251
[public-inbox.git] / lib / PublicInbox / GitAsyncCat.pm
1 # Copyright (C) 2020 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3 #
4 # internal class used by PublicInbox::Git + PublicInbox::DS
5 # This parses the output pipe of "git cat-file --batch"
6 package PublicInbox::GitAsyncCat;
7 use strict;
8 use parent qw(PublicInbox::DS Exporter);
9 use POSIX qw(WNOHANG);
10 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
11 our @EXPORT = qw(git_async_cat git_async_prefetch);
12 use PublicInbox::Git ();
13
14 our $GCF2C; # singleton PublicInbox::Gcf2Client
15
16 sub close {
17         my ($self) = @_;
18
19         if (my $gitish = delete $self->{gitish}) {
20                 PublicInbox::Git::cat_async_abort($gitish);
21         }
22         $self->SUPER::close; # PublicInbox::DS::close
23 }
24
25 sub event_step {
26         my ($self) = @_;
27         my $gitish = $self->{gitish} or return;
28         return $self->close if ($gitish->{in} // 0) != ($self->{sock} // 1);
29         my $inflight = $gitish->{inflight};
30         if ($inflight && @$inflight) {
31                 $gitish->cat_async_step($inflight);
32
33                 # child death?
34                 if (($gitish->{in} // 0) != ($self->{sock} // 1)) {
35                         $self->close;
36                 } elsif (@$inflight || exists $gitish->{cat_rbuf}) {
37                         # ok, more to do, requeue for fairness
38                         $self->requeue;
39                 }
40         } elsif ((my $pid = waitpid($gitish->{pid}, WNOHANG)) > 0) {
41                 # May happen if the child process is killed by a BOFH
42                 # (or segfaults)
43                 delete $gitish->{pid};
44                 warn "E: gitish $pid exited with \$?=$?\n";
45                 $self->close;
46         }
47 }
48
49 sub git_async_cat ($$$$) {
50         my ($git, $oid, $cb, $arg) = @_;
51         my $gitish = $GCF2C //= eval {
52                 require PublicInbox::Gcf2;
53                 require PublicInbox::Gcf2Client;
54                 PublicInbox::Gcf2Client::new();
55         } // 0; # 0: do not retry if libgit2 or Inline::C are missing
56         if ($gitish) { # Gcf2 active, {inflight} may be unset due to errors
57                 $GCF2C->{inflight} or
58                         $gitish = $GCF2C = PublicInbox::Gcf2Client::new();
59                 $oid .= " $git->{git_dir}";
60         } else {
61                 $gitish = $git;
62         }
63         $gitish->cat_async($oid, $cb, $arg);
64         $gitish->{async_cat} //= do {
65                 # read-only end of pipe (Gcf2Client is write-only end)
66                 my $self = bless { gitish => $gitish }, __PACKAGE__;
67                 $gitish->{in}->blocking(0);
68                 $self->SUPER::new($gitish->{in}, EPOLLIN|EPOLLET);
69                 \undef; # this is a true ref()
70         };
71 }
72
73 # this is safe to call inside $cb, but not guaranteed to enqueue
74 # returns true if successful, undef if not.
75 sub git_async_prefetch {
76         my ($git, $oid, $cb, $arg) = @_;
77         if ($GCF2C) {
78                 if ($GCF2C->{async_cat} && !$GCF2C->{wbuf}) {
79                         $oid .= " $git->{git_dir}";
80                         return $GCF2C->cat_async($oid, $cb, $arg);
81                 }
82         } elsif ($git->{async_cat} && (my $inflight = $git->{inflight})) {
83                 # we could use MAX_INFLIGHT here w/o the halving,
84                 # but lets not allow one client to monopolize a git process
85                 if (@$inflight < int(PublicInbox::Git::MAX_INFLIGHT/2)) {
86                         print { $git->{out} } $oid, "\n" or
87                                                 $git->fail("write error: $!");
88                         return push(@$inflight, $oid, $cb, $arg);
89                 }
90         }
91         undef;
92 }
93
94 1;