-# Copyright (C) 2018-2020 all contributors <meta@public-inbox.org>
+# Copyright (C) 2018-2021 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# This interface wraps and mimics PublicInbox::Import
use PublicInbox::InboxWritable;
use PublicInbox::OverIdx;
use PublicInbox::Msgmap;
-use PublicInbox::Spawn qw(spawn popen_rd);
+use PublicInbox::Spawn qw(spawn popen_rd run_die);
use PublicInbox::SearchIdx qw(log2stack crlf_adjust is_ancestor check_size
is_bad_blob);
use IO::Handle; # ->autoflush
my $OID = qr/[a-f0-9]{40,}/;
# an estimate of the post-packed size to the raw uncompressed size
-my $PACKING_FACTOR = 0.4;
+our $PACKING_FACTOR = 0.4;
# SATA storage lags behind what CPUs are capable of, so relying on
# nproc(1) can be misleading and having extra Xapian shards is a
delete $ibx->{search};
$srch->{nshard} // 0
} else { # ExtSearchIdx
- $self->{nshard} // do {
- if ($self->xdb_sharded) {
- $self->{nshard} // die 'BUG: {nshard} unset';
- } else {
- 0;
- }
- }
+ $self->{nshard} ||= scalar($self->xdb_shards_flat);
}
}
sub parallel_init ($$) {
my ($self, $indexlevel) = @_;
- if (($indexlevel // 'full') eq 'basic') {
- $self->{parallel} = 0;
- } else {
- pipe(my ($r, $w)) or die "pipe failed: $!";
- # pipe for barrier notifications doesn't need to be big,
- # 1031: F_SETPIPE_SZ
- fcntl($w, 1031, 4096) if $^O eq 'linux';
- $self->{bnote} = [ $r, $w ];
- $w->autoflush(1);
- }
+ $self->{parallel} = 0 if ($indexlevel // 'full') eq 'basic';
}
# idempotent
}
}
-sub barrier_init {
- my ($self, $n) = @_;
- $self->{bnote} or return;
- --$n;
- my $barrier = { map { $_ => 1 } (0..$n) };
-}
-
-sub barrier_wait {
- my ($self, $barrier) = @_;
- my $bnote = $self->{bnote} or return;
- my $r = $bnote->[0];
- while (scalar keys %$barrier) {
- defined(my $l = readline($r)) or die "EOF on barrier_wait: $!";
- $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l";
- delete $barrier->{$1} or die "bad shard[$1] on barrier wait";
- }
-}
-
# public
sub checkpoint ($;$) {
my ($self, $wait) = @_;
$self->{oidx}->commit_lazy;
# Now deal with Xapian
- if ($wait) {
- my $barrier = barrier_init($self, scalar @$shards);
-
- # each shard needs to issue a barrier command
- $_->shard_barrier for @$shards;
- # wait for each Xapian shard
- barrier_wait($self, $barrier);
- } else {
- $_->shard_commit for @$shards;
+ # start commit_txn_lazy asynchronously on all parallel shards
+ # (non-parallel waits here)
+ $_->ipc_do('commit_txn_lazy') for @$shards;
+
+ # transactions started on parallel shards,
+ # wait for them by issuing an echo command (echo can only
+ # run after commit_txn_lazy is done)
+ if ($wait && $self->{parallel}) {
+ my $i = 0;
+ for my $shard (@$shards) {
+ my $echo = $shard->ipc_do('echo', $i);
+ $echo == $i or die <<"";
+shard[$i] bad echo:$echo != $i waiting for txn commit
+
+ ++$i;
+ }
}
my $midx = $self->{midx}; # misc index
eval { $self->{oidx}->dbh_close };
$err .= "over close: $@\n" if $@;
delete $self->{midx};
- delete $self->{bnote};
my $nbytes = $self->{total_bytes};
$self->{total_bytes} = 0;
$self->lock_release(!!$nbytes) if $shards;
my ($self, $epoch) = @_;
my $git_dir = "$self->{ibx}->{inboxdir}/git/$epoch.git";
PublicInbox::Import::init_bare($git_dir);
- my @cmd = (qw/git config/, "--file=$git_dir/config",
- 'include.path', '../../all.git/config');
- PublicInbox::Import::run_die(\@cmd);
+ run_die([qw(git config), "--file=$git_dir/config",
+ qw(include.path ../../all.git/config)]);
fill_alternates($self, $epoch);
$git_dir
}
sub atfork_child {
my ($self) = @_;
- if (my $shards = $self->{idx_shards}) {
- $_->atfork_child foreach @$shards;
+ if (my $older_siblings = $self->{idx_shards}) {
+ $_->shard_atfork_child for @$older_siblings;
}
if (my $im = $self->{im}) {
$im->atfork_child;
}
- die "unexpected mm" if $self->{mm};
- close $self->{bnote}->[0] or die "close bnote[0]: $!\n";
- $self->{bnote}->[1];
+ die "BUG: unexpected mm" if $self->{mm};
}
sub reindex_checkpoint ($$) {
$self->done; # release lock
}
- if (my $pr = $sync->{-opt}->{-progress}) {
+ if (my $pr = $sync->{-regen_fmt} ? $sync->{-opt}->{-progress} : undef) {
$pr->(sprintf($sync->{-regen_fmt}, ${$sync->{nr}}));
}
$mm_tmp->atfork_parent if $mm_tmp;
}
+sub index_finalize ($$) {
+ my ($arg, $index) = @_;
+ ++$arg->{self}->{nidx};
+ if (defined(my $cur = $arg->{cur_cmt})) {
+ ${$arg->{latest_cmt}} = $cur;
+ } elsif ($index) {
+ die 'BUG: {cur_cmt} missing';
+ } # else { unindexing @leftovers doesn't set {cur_cmt}
+}
+
sub index_oid { # cat_async callback
my ($bref, $oid, $type, $size, $arg) = @_;
- return if is_bad_blob($oid, $type, $size, $arg->{oid});
+ is_bad_blob($oid, $type, $size, $arg->{oid}) and
+ return index_finalize($arg, 1); # size == 0 purged returns here
my $self = $arg->{self};
local $self->{current_info} = "$self->{current_info} $oid";
- return if $size == 0; # purged
my ($num, $mid0);
my $eml = PublicInbox::Eml->new($$bref);
my $mids = mids($eml);
if (do_idx($self, $bref, $eml, $smsg)) {
${$arg->{need_checkpoint}} = 1;
}
- ${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing';
+ index_finalize($arg, 1);
}
# only update last_commit for $i on reindex iff newer than current
-d $git_dir or next; # missing epochs are fine
my $git = PublicInbox::Git->new($git_dir);
my $unit = { git => $git, epoch => $i };
+ my $tip;
if ($reindex_heads) {
- $head = $reindex_heads->[$i] or next;
+ $tip = $head = $reindex_heads->[$i] or next;
+ } else {
+ $tip = $git->qx(qw(rev-parse -q --verify), $head);
+ next if $?; # new repo
+ chomp $tip;
}
- chomp(my $tip = $git->qx(qw(rev-parse -q --verify), $head));
- next if $?; # new repo
-
my $range = log_range($sync, $unit, $tip) or next;
# can't use 'rev-list --count' if we use --diff-filter
$pr->("$pfx $i.git counting $range ... ") if $pr;
my ($self, $oid, $mid) = @_;
my @removed = $self->{oidx}->remove_oid($oid, $mid);
for my $num (@removed) {
- my $idx = idx_shard($self, $num);
- $idx->shard_remove($num);
+ idx_shard($self, $num)->ipc_do('xdb_remove', $num);
}
}
sub unindex_oid ($$;$) { # git->cat_async callback
- my ($bref, $oid, $type, $size, $sync) = @_;
- return if is_bad_blob($oid, $type, $size, $sync->{oid});
- my $self = $sync->{self};
+ my ($bref, $oid, $type, $size, $arg) = @_;
+ is_bad_blob($oid, $type, $size, $arg->{oid}) and
+ return index_finalize($arg, 0);
+ my $self = $arg->{self};
local $self->{current_info} = "$self->{current_info} $oid";
- my $unindexed = $sync->{in_unindex} ? $sync->{unindexed} : undef;
+ my $unindexed = $arg->{in_unindex} ? $arg->{unindexed} : undef;
my $mm = $self->{mm};
my $mids = mids(PublicInbox::Eml->new($bref));
undef $$bref;
}
unindex_oid_aux($self, $oid, $mid);
}
+ index_finalize($arg, 0);
}
sub git { $_[0]->{ibx}->git }
return if $before == $after;
# ensure any blob can not longer be accessed via dumb HTTP
- PublicInbox::Import::run_die(['git',
- "--git-dir=$unit->{git}->{git_dir}",
+ run_die(['git', "--git-dir=$unit->{git}->{git_dir}",
qw(-c gc.reflogExpire=now gc --prune=all --quiet)]);
}
$opt //= {};
return xapian_only($self, $opt) if $opt->{xapian_only};
- my $pr = $opt->{-progress};
my $epoch_max;
- my $latest = $self->{ibx}->git_dir_latest(\$epoch_max);
- return unless defined $latest;
+ my $latest = $self->{ibx}->git_dir_latest(\$epoch_max) // return;
+ if ($opt->{'fast-noop'}) { # nanosecond (st_ctim) comparison
+ use Time::HiRes qw(stat);
+ if (my @mm = stat("$self->{ibx}->{inboxdir}/msgmap.sqlite3")) {
+ my $c = $mm[10]; # 10 = ctime (nsec NV)
+ my @hd = stat("$latest/refs/heads");
+ my @pr = stat("$latest/packed-refs");
+ return if $c > ($hd[10] // 0) && $c > ($pr[10] // 0);
+ }
+ }
+ my $pr = $opt->{-progress};
my $seq = $opt->{sequential_shard};
my $art_beg; # the NNTP article number we start xapian_only at
my $idxlevel = $self->{ibx}->{indexlevel};