v1 and v2 inbox indexing now supports graceful shutdown checks
just like ExtSearchIdx. Additionally, we'll consistently
perform quit checks at the top of loops for consistency.
Interaction with the --xapian-only and --sequential-shard
options are a bit lacking, and will warn the user to use
"--reindex --xapian-only" to fix.
warn "E: $ekey unsupported inbox version (v$v)\n";
return;
}
warn "E: $ekey unsupported inbox version (v$v)\n";
return;
}
- unless ($sync->{quit}) {
- index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
- $self->{midx}->index_ibx($ibx) unless $sync->{quit};
+ for my $unit (@{delete($sync->{todo}) // []}) {
+ last if $sync->{quit};
+ index_todo($self, $sync, $unit);
+ $self->{midx}->index_ibx($ibx) unless $sync->{quit};
$ibx->git->cleanup; # done with this inbox, now
}
$ibx->git->cleanup; # done with this inbox, now
}
-regen_fmt => "%u/?\n",
};
local $SIG{USR1} = sub { $need_checkpoint = 1 };
-regen_fmt => "%u/?\n",
};
local $SIG{USR1} = sub { $need_checkpoint = 1 };
- my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; };
+ my $quit = PublicInbox::SearchIdx::quit_cb($sync);
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
# don't use $_ here, it'll get clobbered by reindex_checkpoint
for my $ibx (@{$self->{ibx_list}}) {
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
# don't use $_ here, it'll get clobbered by reindex_checkpoint
for my $ibx (@{$self->{ibx_list}}) {
- _sync_inbox($self, $sync, $ibx);
+ _sync_inbox($self, $sync, $ibx);
$self->{oidx}->rethread_done($opt) unless $sync->{quit};
PublicInbox::V2Writable::done($self);
$self->{oidx}->rethread_done($opt) unless $sync->{quit};
PublicInbox::V2Writable::done($self);
my ($self, $opt) = @_;
delete $self->{lock_path} if $opt->{-skip_lock};
$self->with_umask(\&_index_sync, $self, $opt);
my ($self, $opt) = @_;
delete $self->{lock_path} if $opt->{-skip_lock};
$self->with_umask(\&_index_sync, $self, $opt);
+ if ($opt->{reindex} && !$opt->{quit}) {
my %again = %$opt;
delete @again{qw(rethread reindex)};
index_sync($self, \%again);
my %again = %$opt;
delete @again{qw(rethread reindex)};
index_sync($self, \%again);
+ $opt->{quit} = $again{quit}; # propagate to caller
if (my $pr = $sync->{-opt}->{-progress}) {
$pr->("indexed $nr/$sync->{ntodo}\n") if $nr;
}
if (my $pr = $sync->{-opt}->{-progress}) {
$pr->("indexed $nr/$sync->{ntodo}\n") if $nr;
}
- if (!$stk) { # more to come
+ if (!$stk && !$sync->{quit}) { # more to come
begin_txn_lazy($self);
$self->{mm}->{dbh}->begin_work;
}
begin_txn_lazy($self);
$self->{mm}->{dbh}->begin_work;
}
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
for my $oid (@leftovers) {
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
for my $oid (@leftovers) {
$oid = unpack('H*', $oid);
$git->cat_async($oid, \&unindex_both, $sync);
}
$oid = unpack('H*', $oid);
$git->cat_async($oid, \&unindex_both, $sync);
}
}
while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
my $arg = { %$sync, cur_cmt => $cur_cmt };
}
while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
my $arg = { %$sync, cur_cmt => $cur_cmt };
if ($f eq 'm') {
$arg->{autime} = $at;
$arg->{cotime} = $ct;
if ($f eq 'm') {
$arg->{autime} = $at;
$arg->{cotime} = $ct;
$git->cat_async($oid, \&unindex_both, $arg);
}
}
$git->cat_async($oid, \&unindex_both, $arg);
}
}
- v1_checkpoint($self, $sync, $stk);
+ v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
ref($reindex) eq 'HASH' ? $reindex->{from} : '';
}
ref($reindex) eq 'HASH' ? $reindex->{from} : '';
}
+sub quit_cb ($) {
+ my ($sync) = @_;
+ sub {
+ # we set {-opt}->{quit} too, so ->index_sync callers
+ # can abort multi-inbox loops this way
+ $sync->{quit} = $sync->{-opt}->{quit} = 1;
+ warn "gracefully quitting\n";
+ }
+}
+
# indexes all unindexed messages (v1 only)
sub _index_sync {
my ($self, $opt) = @_;
# indexes all unindexed messages (v1 only)
sub _index_sync {
my ($self, $opt) = @_;
$ibx->git->batch_prepare;
my $pr = $opt->{-progress};
my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx };
$ibx->git->batch_prepare;
my $pr = $opt->{-progress};
my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx };
+ my $quit = quit_cb($sync);
+ local $SIG{QUIT} = $quit;
+ local $SIG{INT} = $quit;
+ local $SIG{TERM} = $quit;
my $xdb = $self->begin_txn_lazy;
$self->{oidx}->rethread_prepare($opt);
my $mm = _msgmap_init($self);
my $xdb = $self->begin_txn_lazy;
$self->{oidx}->rethread_prepare($opt);
my $mm = _msgmap_init($self);
my $stk = prepare_stack($sync, $range);
$sync->{ntodo} = $stk ? $stk->num_records : 0;
$pr->("$sync->{ntodo}\n") if $pr; # continue previous line
my $stk = prepare_stack($sync, $range);
$sync->{ntodo} = $stk ? $stk->num_records : 0;
$pr->("$sync->{ntodo}\n") if $pr; # continue previous line
- process_stack($self, $sync, $stk);
+ process_stack($self, $sync, $stk) if !$sync->{quit};
local $self->{current_info} = 'leftover ';
my $unindex_oid = $self->can('unindex_oid');
for my $oid (@leftovers) {
local $self->{current_info} = 'leftover ';
my $unindex_oid = $self->can('unindex_oid');
for my $oid (@leftovers) {
$oid = unpack('H*', $oid);
my $req = { %$sync, oid => $oid };
$self->git->cat_async($oid, $unindex_oid, $req);
$oid = unpack('H*', $oid);
my $req = { %$sync, oid => $oid };
$self->git->cat_async($oid, $unindex_oid, $req);
}
$self->git->cat_async_wait;
}
}
$self->git->cat_async_wait;
}
"$beg..$end (% $step)\n");
}
for (my $num = $beg; $num <= $end; $num += $step) {
"$beg..$end (% $step)\n");
}
for (my $num = $beg; $num <= $end; $num += $step) {
my $smsg = $ibx->over->get_art($num) or next;
$smsg->{self} = $self;
$ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
my $smsg = $ibx->over->get_art($num) or next;
$smsg->{self} = $self;
$ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
local $sync->{latest_cmt} = \(my $latest_cmt);
local $sync->{unit} = $unit;
while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
local $sync->{latest_cmt} = \(my $latest_cmt);
local $sync->{unit} = $unit;
while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+ if ($sync->{quit}) {
+ warn "waiting to quit...\n";
+ $all->async_wait_all;
+ $self->update_last_commit($sync);
+ return;
+ }
my $req = {
%$sync,
autime => $at,
my $req = {
%$sync,
autime => $at,
} elsif ($f eq 'd') {
$all->cat_async($oid, $unindex_oid, $req);
}
} elsif ($f eq 'd') {
$all->cat_async($oid, $unindex_oid, $req);
}
- if ($sync->{quit}) {
- warn "waiting to quit...\n";
- $all->async_wait_all;
- $self->update_last_commit($sync);
- return;
- }
if (${$sync->{need_checkpoint}}) {
reindex_checkpoint($self, $sync);
}
if (${$sync->{need_checkpoint}}) {
reindex_checkpoint($self, $sync);
}
if ($seq || !$self->{parallel}) {
my $shard_end = $self->{shards} - 1;
for my $i (0..$shard_end) {
if ($seq || !$self->{parallel}) {
my $shard_end = $self->{shards} - 1;
for my $i (0..$shard_end) {
index_xap_step($self, $sync, $art_beg + $i);
if ($i != $shard_end) {
reindex_checkpoint($self, $sync);
index_xap_step($self, $sync, $art_beg + $i);
if ($i != $shard_end) {
reindex_checkpoint($self, $sync);
ibx => $self->{ibx},
epoch_max => $epoch_max,
};
ibx => $self->{ibx},
epoch_max => $epoch_max,
};
- my $quit = sub { $sync->{quit} = 1 };
+ my $quit = PublicInbox::SearchIdx::quit_cb($sync);
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
$pr->('all.git '.sprintf($sync->{-regen_fmt}, $$nr)) if $pr;
}
$pr->('all.git '.sprintf($sync->{-regen_fmt}, $$nr)) if $pr;
}
# deal with Xapian shards sequentially
if ($seq && delete($sync->{mm_tmp})) {
# deal with Xapian shards sequentially
if ($seq && delete($sync->{mm_tmp})) {
- $self->{ibx}->{indexlevel} = $idxlevel;
- xapian_only($self, $opt, $sync, $art_beg);
+ if ($sync->{quit}) {
+ $quit_warn = 1;
+ } else {
+ $self->{ibx}->{indexlevel} = $idxlevel;
+ xapian_only($self, $opt, $sync, $art_beg);
+ $quit_warn = 1 if $sync->{quit};
+ }
}
# --reindex on the command-line
}
# --reindex on the command-line
- if ($opt->{reindex} && !ref($opt->{reindex}) && $idxlevel ne 'basic') {
+ if (!$sync->{quit} && $opt->{reindex} &&
+ !ref($opt->{reindex}) && $idxlevel ne 'basic') {
$self->lock_acquire;
my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0);
if (my $xdb = $s0->idx_acquire) {
$self->lock_acquire;
my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0);
if (my $xdb = $s0->idx_acquire) {
}
# reindex does not pick up new changes, so we rerun w/o it:
}
# reindex does not pick up new changes, so we rerun w/o it:
+ if ($opt->{reindex} && !$sync->{quit}) {
my %again = %$opt;
$sync = undef;
delete @again{qw(rethread reindex -skip_lock)};
index_sync($self, \%again);
my %again = %$opt;
$sync = undef;
delete @again{qw(rethread reindex -skip_lock)};
index_sync($self, \%again);
+ $opt->{quit} = $again{quit}; # propagate to caller
+ warn <<EOF if $quit_warn;
+W: interrupted, --xapian-only --reindex required upon restart
+EOF
$ibx_opt = { %$opt, sequential_shard => $v };
}
PublicInbox::Admin::index_inbox($ibx, undef, $ibx_opt);
$ibx_opt = { %$opt, sequential_shard => $v };
}
PublicInbox::Admin::index_inbox($ibx, undef, $ibx_opt);
+ last if $ibx_opt->{quit};
if (my $copt = $opt->{compact_opt}) {
local $copt->{jobs} = 0 if $ibx_opt->{sequential_shard};
PublicInbox::Xapcmd::run($ibx, 'compact', $copt);
if (my $copt = $opt->{compact_opt}) {
local $copt->{jobs} = 0 if $ibx_opt->{sequential_shard};
PublicInbox::Xapcmd::run($ibx, 'compact', $copt);