This ought to improve overall performance with multiple clients.
Single client performance suffers a tiny bit due to extra
syscall overhead from epoll.
This also makes the existing async interface easier-to-use,
since calling cat_async_begin is no longer required.
lib/PublicInbox/Filter/Vger.pm
lib/PublicInbox/GetlineBody.pm
lib/PublicInbox/Git.pm
+lib/PublicInbox/GitAsyncCat.pm
lib/PublicInbox/GitHTTPBackend.pm
lib/PublicInbox/GzipFilter.pm
lib/PublicInbox/HTTP.pm
}
}
-sub _cat_async_step ($$) {
+sub cat_async_step ($$) {
my ($self, $inflight) = @_;
die 'BUG: inflight empty or odd' if scalar(@$inflight) < 2;
my ($cb, $arg) = splice(@$inflight, 0, 2);
my ($self) = @_;
my $inflight = delete $self->{inflight} or return;
while (scalar(@$inflight)) {
- _cat_async_step($self, $inflight);
+ cat_async_step($self, $inflight);
}
}
# returns true if there are pending "git cat-file" processes
sub cleanup {
my ($self) = @_;
+ cat_async_wait($self);
_destroy($self, qw(--batch in out pid));
_destroy($self, qw(--batch-check in_c out_c pid_c err_c));
!!($self->{pid} || $self->{pid_c});
sub cat_async ($$$;$) {
my ($self, $oid, $cb, $arg) = @_;
- my $inflight = $self->{inflight} or die 'BUG: not in async';
+ my $inflight = $self->{inflight} // cat_async_begin($self);
if (scalar(@$inflight) >= MAX_INFLIGHT) {
- _cat_async_step($self, $inflight);
+ cat_async_step($self, $inflight);
}
print { $self->{out} } $oid, "\n" or fail($self, "write error: $!");
my ($self) = @_;
my $modified = 0;
my $fh = popen($self, qw(rev-parse --branches));
- cat_async_begin($self);
local $/ = "\n";
while (my $oid = <$fh>) {
chomp $oid;
--- /dev/null
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# internal class used by PublicInbox::Git + Danga::Socket
+# This parses the output pipe of "git cat-file --batch"
+#
+# Note: this does NOT set the non-blocking flag, we expect `git cat-file'
+# to be a local process, and git won't start writing a blob until it's
+# fully read. So minimize context switching and read as much as possible
+# and avoid holding a buffer in our heap any longer than it has to live.
+package PublicInbox::GitAsyncCat;
+use strict;
+use parent qw(PublicInbox::DS Exporter);
+use fields qw(git);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+our @EXPORT = qw(git_async_msg);
+
+sub new {
+ my ($class, $git) = @_;
+ my $self = fields::new($class);
+ $git->batch_prepare;
+ $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET);
+ $self->{git} = $git;
+ $self;
+}
+
+sub event_step {
+ my ($self) = @_;
+ my $git = $self->{git} or return; # ->close-ed
+ my $inflight = $git->{inflight};
+ if (@$inflight) {
+ $git->cat_async_step($inflight);
+ $self->requeue if @$inflight || length(${$git->{'--batch'}});
+ }
+}
+
+sub close {
+ my ($self) = @_;
+ delete $self->{git};
+ $self->SUPER::close; # PublicInbox::DS::close
+}
+
+sub git_async_msg ($$$$) {
+ my ($ibx, $smsg, $cb, $arg) = @_;
+ $ibx->git->cat_async($smsg->{blob}, $cb, $arg);
+ $ibx->{async_cat} //= new(__PACKAGE__, $ibx->{git});
+}
+
+1;
use PublicInbox::EmlContentFoo qw(parse_content_disposition);
use PublicInbox::DS qw(now);
use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+use PublicInbox::GitAsyncCat;
use Text::ParseWords qw(parse_line);
use Errno qw(EAGAIN);
+
my $Address;
for my $mod (qw(Email::Address::XS Mail::Address)) {
eval "require $mod" or next;
\$ret;
}
-sub uid_fetch_cb { # called by git->cat_async
+sub requeue_once ($) {
+ my ($self) = @_;
+ # COMPRESS users all share the same DEFLATE context.
+ # Flush it here to ensure clients don't see
+ # each other's data
+ $self->zflush;
+
+ # no recursion, schedule another call ASAP,
+ # but only after all pending writes are done.
+ # autovivify wbuf:
+ my $new_size = push(@{$self->{wbuf}}, \&long_step);
+
+ # wbuf may be populated by $cb, no need to rearm if so:
+ $self->requeue if $new_size == 1;
+}
+
+sub uid_fetch_cb { # called by git->cat_async via git_async_msg
my ($bref, $oid, $type, $size, $fetch_m_arg) = @_;
my ($self, undef, $ibx, undef, undef, $msgs, $want) = @$fetch_m_arg;
my $smsg = shift @$msgs or die 'BUG: no smsg';
if (!defined($oid)) {
# it's possible to have TOCTOU if an admin runs
# public-inbox-(edit|purge), just move onto the next message
- return unless defined $want->{-seqno};
+ return requeue_once($self) unless defined $want->{-seqno};
$bref = dummy_message($smsg->{num}, $ibx);
} else {
$smsg->{blob} eq $oid or die "BUG: $smsg->{blob} != $oid";
partial_emit($self, $partial, $eml);
}
$self->msg_more(")\r\n");
+ requeue_once($self);
}
sub uid_fetch_m { # long_response
}
$$beg = $msgs->[-1]->{num} + 1;
}
- my $git = $ibx->git;
- $git->cat_async_begin; # TODO: actually make async
- $git->cat_async($msgs->[0]->{blob}, \&uid_fetch_cb, \@_);
- $git->cat_async_wait;
- 1;
+ git_async_msg($ibx, $msgs->[0], \&uid_fetch_cb, \@_);
}
sub cmd_status ($$$;@) {
my $seq = $want->{-seqno}++;
my $cur_num = $msgs->[0]->{num};
if ($cur_num == $seq) { # as expected
- my $git = $ibx->git;
- $git->cat_async_begin; # TODO: actually make async
- $git->cat_async($msgs->[0]->{blob}, \&uid_fetch_cb, \@_);
- $git->cat_async_wait;
+ git_async_msg($ibx, $msgs->[0], \&uid_fetch_cb, \@_);
} elsif ($cur_num > $seq) {
# send dummy messages until $seq catches up to $cur_num
my $smsg = bless { num => $seq, ts => 0 }, 'PublicInbox::Smsg';
unshift @$msgs, $smsg;
my $bref = dummy_message($seq, $ibx);
uid_fetch_cb($bref, undef, undef, undef, \@_);
+ $smsg; # blessed response since uid_fetch_cb requeues
} else { # should not happen
die "BUG: cur_num=$cur_num < seq=$seq";
}
- 1; # more messages on the way
}
sub cmd_fetch ($$$;@) {
} elsif ($more) { # $self->{wbuf}:
$self->update_idle_time;
- # COMPRESS users all share the same DEFLATE context.
- # Flush it here to ensure clients don't see
- # each other's data
- $self->zflush;
-
- # no recursion, schedule another call ASAP, but only after
- # all pending writes are done. autovivify wbuf:
- my $new_size = push(@{$self->{wbuf}}, \&long_step);
-
- # wbuf may be populated by $cb, no need to rearm if so:
- $self->requeue if $new_size == 1;
+ # control passed to $more may be a GitAsyncCat object
+ requeue_once($self) if !ref($more);
} else { # all done!
delete $self->{long_cb};
my $elapsed = now() - $t0;
my $cleanup_avail = -1; # 0, or 1
my $have_devel_peek;
my $CLEANUP = {}; # string(inbox) -> inbox
+
+sub git_cleanup ($) {
+ my ($self) = @_;
+ my $git = $self->{git} or return;
+ if (my $async_cat = delete $self->{async_cat}) {
+ $async_cat->close;
+ }
+ $git->cleanup;
+}
+
sub cleanup_task () {
$cleanup_timer = undef;
my $next = {};
# refcnt is zero when tmp is out-of-scope
}
}
- if (my $git = $ibx->{git}) {
- $again = $git->cleanup;
- }
+ git_cleanup($ibx);
if (my $gits = $ibx->{-repo_objs}) {
foreach my $git (@$gits) {
$again = 1 if $git->cleanup;
my $cur = $self->{-max_git_epoch};
my $changed = git($self)->alternates_changed;
if (!defined($cur) || $changed) {
- $self->git->cleanup if $changed;
+ git_cleanup($self) if $changed;
my $gits = "$self->{inboxdir}/git";
if (opendir my $dh, $gits) {
my $max = -1;
my $arg = { 'foo' => 'bar' };
my $res = [];
my $missing = [];
- $gcf->cat_async_begin;
$gcf->cat_async($oid, sub {
my ($bref, $oid_hex, $type, $size, $arg) = @_;
$res = [ @_ ];
diag xqx([qw(git diff), "$tmpdir/mime", "$tmpdir/eml"]);
}
};
-$git->cat_async_begin;
my $t = timeit(1, sub {
while (<$fh>) {
my ($oid, $type) = split / /;
is_deeply($eml_cmp, $mime_cmp, "$inboxdir $oid match");
}
};
-$git->cat_async_begin;
my $t = timeit(1, sub {
while (<$fh>) {
my ($oid, $type) = split / /;
};
my $t = timeit(1, sub {
- $git->cat_async_begin;
my ($blob, $type);
while (<$fh>) {
($blob, $type) = split / /;
$dig->add($$bref);
};
my $cat = $git->popen(@cat);
- $git->cat_async_begin;
while (<$cat>) {
my ($oid, undef, undef) = split(/ /);
}
my $fh = $git->popen(@cat);
-$git->cat_async_begin;
while (<$fh>) {
my ($oid, $type) = split / /;
next if $type ne 'blob';
$obuf = '';
};
-$git->cat_async_begin;
my $t = timeit(1, sub {
$ctx->{obuf} = \$obuf;
$ctx->{mhref} = '../';