sub count_shards ($) {
my ($self) = @_;
- $self->{ibx} ? do {
+ if (my $ibx = $self->{ibx}) {
# always load existing shards in case core count changes:
# Also, shard count may change while -watch is running
- my $srch = $self->{ibx}->search or return 0;
- delete $self->{ibx}->{search};
+ my $srch = $ibx->search or return 0;
+ delete $ibx->{search};
$srch->{nshard} // 0
- } : $self->{nshard}; # self->{nshard} is for ExtSearchIdx
+ } else { # ExtSearchIdx
+ $self->{nshard} // do {
+ if ($self->xdb_sharded) {
+ $self->{nshard} // die 'BUG: {nshard} unset';
+ } else {
+ 0;
+ }
+ }
+ }
}
sub new {
$_->shard_commit for @$shards;
}
+ my $midx = $self->{midx}; # misc index
+ $midx->commit_txn if $midx;
+
# last_commit is special, don't commit these until
# Xapian shards are done:
$dbh->begin_work if $dbh;
$dbh->commit;
$dbh->begin_work;
}
+ $midx->begin_txn if $midx;
}
$self->{total_bytes} += $self->{transact_bytes};
$self->{transact_bytes} = 0;
}
eval { $self->{oidx}->dbh_close };
$err .= "over close: $@\n" if $@;
+ delete $self->{midx};
delete $self->{bnote};
my $nbytes = $self->{total_bytes};
$self->{total_bytes} = 0;
my ($self, $sync) = @_;
$self->git->async_wait_all;
+ $self->update_last_commit($sync);
${$sync->{need_checkpoint}} = 0;
my $mm_tmp = $sync->{mm_tmp};
$mm_tmp->atfork_prepare if $mm_tmp;
sub index_oid { # cat_async callback
my ($bref, $oid, $type, $size, $arg) = @_;
+ my $self = $arg->{self};
+ local $self->{current_info} = "$self->{current_info} $oid";
return if $size == 0; # purged
my ($num, $mid0);
my $eml = PublicInbox::Eml->new($$bref);
my $mids = mids($eml);
my $chash = content_hash($eml);
- my $self = $arg->{self};
if (scalar(@$mids) == 0) {
warn "E: $oid has no Message-ID, skipping\n";
if (do_idx($self, $bref, $eml, $smsg)) {
${$arg->{need_checkpoint}} = 1;
}
+ ${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing';
}
# only update last_commit for $i on reindex iff newer than current
-# $sync will be used by subclasses
sub update_last_commit {
- my ($self, $sync, $unit, $cmt) = @_;
+ my ($self, $sync, $stk) = @_;
+ my $unit = $sync->{unit} // return;
+ my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ defined($latest_cmt) or return;
my $last = last_epoch_commit($self, $unit->{epoch});
- if (defined $last && is_ancestor($unit->{git}, $last, $cmt)) {
- my @cmd = (qw(rev-list --count), "$last..$cmt");
+ if (defined $last && is_ancestor($self->git, $last, $latest_cmt)) {
+ my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
chomp(my $n = $unit->{git}->qx(@cmd));
return if $n ne '' && $n == 0;
}
- last_epoch_commit($self, $unit->{epoch}, $cmt);
+ last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
}
sub last_commits {
my $range = "$cur..$tip";
$pr->("$i.git checking contiguity... ") if $pr;
my $git = $unit->{git};
- if (is_ancestor($git, $cur, $tip)) { # common case
+ if (is_ancestor($sync->{self}->git, $cur, $tip)) { # common case
$pr->("OK\n") if $pr;
my $n = $git->qx(qw(rev-list --count), $range);
chomp($n);
my $pr = $sync->{-opt}->{-progress};
my $regen_max = 0;
my $head = $sync->{ibx}->{ref_head} || 'HEAD';
+ my $pfx;
+ if ($pr) {
+ ($pfx) = ($sync->{ibx}->{inboxdir} =~ m!([^/]+)\z!g);
+ $pfx //= $sync->{ibx}->{inboxdir};
+ }
# reindex stops at the current heads and we later rerun index_sync
# without {reindex}
my $range = log_range($sync, $unit, $tip) or next;
# can't use 'rev-list --count' if we use --diff-filter
- $pr->("$i.git counting $range ... ") if $pr;
+ $pr->("$pfx $i.git counting $range ... ") if $pr;
# Don't bump num_highwater on --reindex by using {D}.
# We intentionally do NOT use {D} in the non-reindex case
# because we want NNTP article number gaps from unindexed
# messages to show up in mirrors, too.
$sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR
my $stk = log2stack($sync, $git, $range);
+ return 0 if $sync->{quit};
my $nr = $stk ? $stk->num_records : 0;
$pr->("$nr\n") if $pr;
$unit->{stack} = $stk; # may be undef
unshift @{$sync->{todo}}, $unit;
$regen_max += $nr;
}
+ return 0 if $sync->{quit};
# XXX this should not happen unless somebody bypasses checks in
# our code and blindly injects "d" file history into git repos
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
+ local $self->{current_info} = 'leftover ';
my $unindex_oid = $self->can('unindex_oid');
for my $oid (@leftovers) {
+ last if $sync->{quit};
$oid = unpack('H*', $oid);
- $self->{current_info} = "leftover $oid";
my $req = { %$sync, oid => $oid };
$self->git->cat_async($oid, $unindex_oid, $req);
}
$self->git->cat_async_wait;
}
+ return 0 if $sync->{quit};
if (!$regen_max) {
$sync->{-regen_fmt} = "%u/?\n";
return 0;
my @removed = $self->{oidx}->remove_oid($oid, $mid);
for my $num (@removed) {
my $idx = idx_shard($self, $num);
- $idx->shard_remove($oid, $num);
+ $idx->shard_remove($num);
}
}
sub unindex_oid ($$;$) { # git->cat_async callback
my ($bref, $oid, $type, $size, $sync) = @_;
my $self = $sync->{self};
+ local $self->{current_info} = "$self->{current_info} $oid";
my $unindexed = $sync->{in_unindex} ? $sync->{unindexed} : undef;
my $mm = $self->{mm};
my $mids = mids(PublicInbox::Eml->new($bref));
"$beg..$end (% $step)\n");
}
for (my $num = $beg; $num <= $end; $num += $step) {
+ last if $sync->{quit};
my $smsg = $ibx->over->get_art($num) or next;
$smsg->{self} = $self;
$ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
sub index_todo ($$$) {
my ($self, $sync, $unit) = @_;
+ return if $sync->{quit};
unindex_todo($self, $sync, $unit);
my $stk = delete($unit->{stack}) or return;
my $all = $self->git;
my $index_oid = $self->can('index_oid');
my $unindex_oid = $self->can('unindex_oid');
- my ($pfx) = ($unit->{git}->{git_dir} =~ m!/([^/]+)\z!g);
- $pfx //= $unit->{git}->{git_dir};
- while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
- $self->{current_info} = "$pfx $oid";
- my $req = { %$sync, autime => $at, cotime => $ct, oid => $oid };
+ my $pfx;
+ if ($unit->{git}->{git_dir} =~ m!/([^/]+)/git/([0-9]+\.git)\z!) {
+ $pfx = "$1 $2"; # v2
+ } else { # v1
+ ($pfx) = ($unit->{git}->{git_dir} =~ m!/([^/]+)\z!g);
+ $pfx //= $unit->{git}->{git_dir};
+ }
+ local $self->{current_info} = "$pfx ";
+ local $sync->{latest_cmt} = \(my $latest_cmt);
+ local $sync->{unit} = $unit;
+ while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+ if ($sync->{quit}) {
+ warn "waiting to quit...\n";
+ $all->async_wait_all;
+ $self->update_last_commit($sync);
+ return;
+ }
+ my $req = {
+ %$sync,
+ autime => $at,
+ cotime => $ct,
+ oid => $oid,
+ cur_cmt => $cmt
+ };
if ($f eq 'm') {
if ($sync->{max_size}) {
$all->check_async($oid, \&check_size, $req);
}
}
$all->async_wait_all;
- $self->update_last_commit($sync, $unit, $stk->{latest_cmt});
+ $self->update_last_commit($sync, $stk);
}
sub xapian_only {
if ($seq || !$self->{parallel}) {
my $shard_end = $self->{shards} - 1;
for my $i (0..$shard_end) {
+ last if $sync->{quit};
index_xap_step($self, $sync, $art_beg + $i);
if ($i != $shard_end) {
reindex_checkpoint($self, $sync);
ibx => $self->{ibx},
epoch_max => $epoch_max,
};
+ my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+ local $SIG{QUIT} = $quit;
+ local $SIG{INT} = $quit;
+ local $SIG{TERM} = $quit;
+
if (sync_prepare($self, $sync)) {
# tmp_clone seems to fail if inside a transaction, so
# we rollback here (because we opened {mm} for reading)
# xapian_only works incrementally w/o --reindex
if ($seq && !$opt->{reindex}) {
- $art_beg = $sync->{mm_tmp}->max;
- $art_beg++ if defined($art_beg);
+ $art_beg = $sync->{mm_tmp}->max || -1;
+ $art_beg++;
}
}
# work forwards through history
- index_todo($self, $sync, $_) for @{$sync->{todo}};
- $self->{oidx}->rethread_done($opt);
+ index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
+ $self->{oidx}->rethread_done($opt) unless $sync->{quit};
$self->done;
if (my $nr = $sync->{nr}) {
$pr->('all.git '.sprintf($sync->{-regen_fmt}, $$nr)) if $pr;
}
+ my $quit_warn;
# deal with Xapian shards sequentially
if ($seq && delete($sync->{mm_tmp})) {
- $self->{ibx}->{indexlevel} = $idxlevel;
- xapian_only($self, $opt, $sync, $art_beg);
+ if ($sync->{quit}) {
+ $quit_warn = 1;
+ } else {
+ $self->{ibx}->{indexlevel} = $idxlevel;
+ xapian_only($self, $opt, $sync, $art_beg);
+ $quit_warn = 1 if $sync->{quit};
+ }
}
# --reindex on the command-line
- if ($opt->{reindex} && !ref($opt->{reindex}) && $idxlevel ne 'basic') {
+ if (!$sync->{quit} && $opt->{reindex} &&
+ !ref($opt->{reindex}) && $idxlevel ne 'basic') {
$self->lock_acquire;
my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0);
if (my $xdb = $s0->idx_acquire) {
}
# reindex does not pick up new changes, so we rerun w/o it:
- if ($opt->{reindex}) {
+ if ($opt->{reindex} && !$sync->{quit}) {
my %again = %$opt;
$sync = undef;
delete @again{qw(rethread reindex -skip_lock)};
index_sync($self, \%again);
+ $opt->{quit} = $again{quit}; # propagate to caller
}
+ warn <<EOF if $quit_warn;
+W: interrupted, --xapian-only --reindex required upon restart
+EOF
}
1;