sub count_shards ($) {
my ($self) = @_;
- $self->{ibx} ? do {
+ if (my $ibx = $self->{ibx}) {
# always load existing shards in case core count changes:
# Also, shard count may change while -watch is running
- my $srch = $self->{ibx}->search or return 0;
- delete $self->{ibx}->{search};
+ my $srch = $ibx->search or return 0;
+ delete $ibx->{search};
$srch->{nshard} // 0
- } : $self->{nshard}; # self->{nshard} is for ExtSearchIdx
+ } else { # ExtSearchIdx
+ $self->{nshard} // do {
+ if ($self->xdb_sharded) {
+ $self->{nshard} // die 'BUG: {nshard} unset';
+ } else {
+ 0;
+ }
+ }
+ }
}
sub new {
$_->shard_commit for @$shards;
}
+ my $midx = $self->{midx}; # misc index
+ $midx->commit_txn if $midx;
+
# last_commit is special, don't commit these until
# Xapian shards are done:
$dbh->begin_work if $dbh;
$dbh->commit;
$dbh->begin_work;
}
+ $midx->begin_txn if $midx;
}
$self->{total_bytes} += $self->{transact_bytes};
$self->{transact_bytes} = 0;
}
eval { $self->{oidx}->dbh_close };
$err .= "over close: $@\n" if $@;
+ delete $self->{midx};
delete $self->{bnote};
my $nbytes = $self->{total_bytes};
$self->{total_bytes} = 0;
local $self->{current_info} = 'leftover ';
my $unindex_oid = $self->can('unindex_oid');
for my $oid (@leftovers) {
+ last if $sync->{quit};
$oid = unpack('H*', $oid);
my $req = { %$sync, oid => $oid };
$self->git->cat_async($oid, $unindex_oid, $req);
- last if $sync->{quit};
}
$self->git->cat_async_wait;
}
my @removed = $self->{oidx}->remove_oid($oid, $mid);
for my $num (@removed) {
my $idx = idx_shard($self, $num);
- $idx->shard_remove($oid, $num);
+ $idx->shard_remove($num);
}
}
"$beg..$end (% $step)\n");
}
for (my $num = $beg; $num <= $end; $num += $step) {
+ last if $sync->{quit};
my $smsg = $ibx->over->get_art($num) or next;
$smsg->{self} = $self;
$ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
local $sync->{latest_cmt} = \(my $latest_cmt);
local $sync->{unit} = $unit;
while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+ if ($sync->{quit}) {
+ warn "waiting to quit...\n";
+ $all->async_wait_all;
+ $self->update_last_commit($sync);
+ return;
+ }
my $req = {
%$sync,
autime => $at,
} elsif ($f eq 'd') {
$all->cat_async($oid, $unindex_oid, $req);
}
- if ($sync->{quit}) {
- warn "waiting to quit...\n";
- $all->async_wait_all;
- $self->update_last_commit($sync);
- return;
- }
if (${$sync->{need_checkpoint}}) {
reindex_checkpoint($self, $sync);
}
if ($seq || !$self->{parallel}) {
my $shard_end = $self->{shards} - 1;
for my $i (0..$shard_end) {
+ last if $sync->{quit};
index_xap_step($self, $sync, $art_beg + $i);
if ($i != $shard_end) {
reindex_checkpoint($self, $sync);
ibx => $self->{ibx},
epoch_max => $epoch_max,
};
- my $quit = sub { $sync->{quit} = 1 };
+ my $quit = PublicInbox::SearchIdx::quit_cb($sync);
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
# xapian_only works incrementally w/o --reindex
if ($seq && !$opt->{reindex}) {
- $art_beg = $sync->{mm_tmp}->max;
- $art_beg++ if defined($art_beg);
+ $art_beg = $sync->{mm_tmp}->max || -1;
+ $art_beg++;
}
}
# work forwards through history
$pr->('all.git '.sprintf($sync->{-regen_fmt}, $$nr)) if $pr;
}
+ my $quit_warn;
# deal with Xapian shards sequentially
if ($seq && delete($sync->{mm_tmp})) {
- $self->{ibx}->{indexlevel} = $idxlevel;
- xapian_only($self, $opt, $sync, $art_beg);
+ if ($sync->{quit}) {
+ $quit_warn = 1;
+ } else {
+ $self->{ibx}->{indexlevel} = $idxlevel;
+ xapian_only($self, $opt, $sync, $art_beg);
+ $quit_warn = 1 if $sync->{quit};
+ }
}
# --reindex on the command-line
- if ($opt->{reindex} && !ref($opt->{reindex}) && $idxlevel ne 'basic') {
+ if (!$sync->{quit} && $opt->{reindex} &&
+ !ref($opt->{reindex}) && $idxlevel ne 'basic') {
$self->lock_acquire;
my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0);
if (my $xdb = $s0->idx_acquire) {
}
# reindex does not pick up new changes, so we rerun w/o it:
- if ($opt->{reindex}) {
+ if ($opt->{reindex} && !$sync->{quit}) {
my %again = %$opt;
$sync = undef;
delete @again{qw(rethread reindex -skip_lock)};
index_sync($self, \%again);
+ $opt->{quit} = $again{quit}; # propagate to caller
}
+ warn <<EOF if $quit_warn;
+W: interrupted, --xapian-only --reindex required upon restart
+EOF
}
1;