]> Sergey Matveev's repositories - public-inbox.git/commitdiff
*index: more consistent graceful shutdown checks
authorEric Wong <e@80x24.org>
Fri, 27 Nov 2020 21:33:55 +0000 (21:33 +0000)
committerEric Wong <e@80x24.org>
Sat, 28 Nov 2020 04:53:25 +0000 (04:53 +0000)
v1 and v2 inbox indexing now supports graceful shutdown checks
just like ExtSearchIdx.  Additionally, we'll consistently
perform quit checks at the top of loops for consistency.

Interaction with the --xapian-only and --sequential-shard
options are a bit lacking, and will warn the user to use
"--reindex --xapian-only" to fix.

lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/V2Writable.pm
script/public-inbox-index

index 7ab0c4af64a9a3c5951cf796bd017f51d3984947..cf90c562ebe27163248ded0ee0138f60d3132e03 100644 (file)
@@ -309,10 +309,11 @@ sub _sync_inbox ($$$) {
                warn "E: $ekey unsupported inbox version (v$v)\n";
                return;
        }
-       unless ($sync->{quit}) {
-               index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
-               $self->{midx}->index_ibx($ibx) unless $sync->{quit};
+       for my $unit (@{delete($sync->{todo}) // []}) {
+               last if $sync->{quit};
+               index_todo($self, $sync, $unit);
        }
+       $self->{midx}->index_ibx($ibx) unless $sync->{quit};
        $ibx->git->cleanup; # done with this inbox, now
 }
 
@@ -334,17 +335,16 @@ sub eidx_sync { # main entry point
                -regen_fmt => "%u/?\n",
        };
        local $SIG{USR1} = sub { $need_checkpoint = 1 };
-       my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; };
+       my $quit = PublicInbox::SearchIdx::quit_cb($sync);
        local $SIG{QUIT} = $quit;
        local $SIG{INT} = $quit;
        local $SIG{TERM} = $quit;
 
        # don't use $_ here, it'll get clobbered by reindex_checkpoint
        for my $ibx (@{$self->{ibx_list}}) {
-               _sync_inbox($self, $sync, $ibx);
                last if $sync->{quit};
+               _sync_inbox($self, $sync, $ibx);
        }
-
        $self->{oidx}->rethread_done($opt) unless $sync->{quit};
 
        PublicInbox::V2Writable::done($self);
index 18390602faada97021c0b774ac944550208ac70e..d06c159b8d48bdc052555104ddbbf614cba2d33b 100644 (file)
@@ -632,10 +632,11 @@ sub index_sync {
        my ($self, $opt) = @_;
        delete $self->{lock_path} if $opt->{-skip_lock};
        $self->with_umask(\&_index_sync, $self, $opt);
-       if ($opt->{reindex}) {
+       if ($opt->{reindex} && !$opt->{quit}) {
                my %again = %$opt;
                delete @again{qw(rethread reindex)};
                index_sync($self, \%again);
+               $opt->{quit} = $again{quit}; # propagate to caller
        }
 }
 
@@ -688,7 +689,7 @@ sub v1_checkpoint ($$;$) {
        if (my $pr = $sync->{-opt}->{-progress}) {
                $pr->("indexed $nr/$sync->{ntodo}\n") if $nr;
        }
-       if (!$stk) { # more to come
+       if (!$stk && !$sync->{quit}) { # more to come
                begin_txn_lazy($self);
                $self->{mm}->{dbh}->begin_work;
        }
@@ -709,6 +710,7 @@ sub process_stack {
        if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
                warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
                for my $oid (@leftovers) {
+                       last if $sync->{quit};
                        $oid = unpack('H*', $oid);
                        $git->cat_async($oid, \&unindex_both, $sync);
                }
@@ -718,6 +720,7 @@ sub process_stack {
        }
        while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
                my $arg = { %$sync, cur_cmt => $cur_cmt };
+               last if $sync->{quit};
                if ($f eq 'm') {
                        $arg->{autime} = $at;
                        $arg->{cotime} = $ct;
@@ -731,7 +734,7 @@ sub process_stack {
                        $git->cat_async($oid, \&unindex_both, $arg);
                }
        }
-       v1_checkpoint($self, $sync, $stk);
+       v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
 }
 
 sub log2stack ($$$) {
@@ -841,6 +844,16 @@ sub reindex_from ($$) {
        ref($reindex) eq 'HASH' ? $reindex->{from} : '';
 }
 
+sub quit_cb ($) {
+       my ($sync) = @_;
+       sub {
+               # we set {-opt}->{quit} too, so ->index_sync callers
+               # can abort multi-inbox loops this way
+               $sync->{quit} = $sync->{-opt}->{quit} = 1;
+               warn "gracefully quitting\n";
+       }
+}
+
 # indexes all unindexed messages (v1 only)
 sub _index_sync {
        my ($self, $opt) = @_;
@@ -850,6 +863,10 @@ sub _index_sync {
        $ibx->git->batch_prepare;
        my $pr = $opt->{-progress};
        my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx };
+       my $quit = quit_cb($sync);
+       local $SIG{QUIT} = $quit;
+       local $SIG{INT} = $quit;
+       local $SIG{TERM} = $quit;
        my $xdb = $self->begin_txn_lazy;
        $self->{oidx}->rethread_prepare($opt);
        my $mm = _msgmap_init($self);
@@ -870,7 +887,7 @@ sub _index_sync {
        my $stk = prepare_stack($sync, $range);
        $sync->{ntodo} = $stk ? $stk->num_records : 0;
        $pr->("$sync->{ntodo}\n") if $pr; # continue previous line
-       process_stack($self, $sync, $stk);
+       process_stack($self, $sync, $stk) if !$sync->{quit};
 }
 
 sub DESTROY {
index 7bef1c89a21ef220933ff67bd9992cd72b7a1ab1..a3938b5628f854ea1508b7063623c312dcf2a4f1 100644 (file)
@@ -1106,10 +1106,10 @@ sub sync_prepare ($$) {
                local $self->{current_info} = 'leftover ';
                my $unindex_oid = $self->can('unindex_oid');
                for my $oid (@leftovers) {
+                       last if $sync->{quit};
                        $oid = unpack('H*', $oid);
                        my $req = { %$sync, oid => $oid };
                        $self->git->cat_async($oid, $unindex_oid, $req);
-                       last if $sync->{quit};
                }
                $self->git->cat_async_wait;
        }
@@ -1233,6 +1233,7 @@ sub index_xap_step ($$$;$) {
                        "$beg..$end (% $step)\n");
        }
        for (my $num = $beg; $num <= $end; $num += $step) {
+               last if $sync->{quit};
                my $smsg = $ibx->over->get_art($num) or next;
                $smsg->{self} = $self;
                $ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
@@ -1262,6 +1263,12 @@ sub index_todo ($$$) {
        local $sync->{latest_cmt} = \(my $latest_cmt);
        local $sync->{unit} = $unit;
        while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+               if ($sync->{quit}) {
+                       warn "waiting to quit...\n";
+                       $all->async_wait_all;
+                       $self->update_last_commit($sync);
+                       return;
+               }
                my $req = {
                        %$sync,
                        autime => $at,
@@ -1278,12 +1285,6 @@ sub index_todo ($$$) {
                } elsif ($f eq 'd') {
                        $all->cat_async($oid, $unindex_oid, $req);
                }
-               if ($sync->{quit}) {
-                       warn "waiting to quit...\n";
-                       $all->async_wait_all;
-                       $self->update_last_commit($sync);
-                       return;
-               }
                if (${$sync->{need_checkpoint}}) {
                        reindex_checkpoint($self, $sync);
                }
@@ -1310,6 +1311,7 @@ sub xapian_only {
                if ($seq || !$self->{parallel}) {
                        my $shard_end = $self->{shards} - 1;
                        for my $i (0..$shard_end) {
+                               last if $sync->{quit};
                                index_xap_step($self, $sync, $art_beg + $i);
                                if ($i != $shard_end) {
                                        reindex_checkpoint($self, $sync);
@@ -1350,7 +1352,7 @@ sub index_sync {
                ibx => $self->{ibx},
                epoch_max => $epoch_max,
        };
-       my $quit = sub { $sync->{quit} = 1 };
+       my $quit = PublicInbox::SearchIdx::quit_cb($sync);
        local $SIG{QUIT} = $quit;
        local $SIG{INT} = $quit;
        local $SIG{TERM} = $quit;
@@ -1381,14 +1383,21 @@ sub index_sync {
                $pr->('all.git '.sprintf($sync->{-regen_fmt}, $$nr)) if $pr;
        }
 
+       my $quit_warn;
        # deal with Xapian shards sequentially
        if ($seq && delete($sync->{mm_tmp})) {
-               $self->{ibx}->{indexlevel} = $idxlevel;
-               xapian_only($self, $opt, $sync, $art_beg);
+               if ($sync->{quit}) {
+                       $quit_warn = 1;
+               } else {
+                       $self->{ibx}->{indexlevel} = $idxlevel;
+                       xapian_only($self, $opt, $sync, $art_beg);
+                       $quit_warn = 1 if $sync->{quit};
+               }
        }
 
        # --reindex on the command-line
-       if ($opt->{reindex} && !ref($opt->{reindex}) && $idxlevel ne 'basic') {
+       if (!$sync->{quit} && $opt->{reindex} &&
+                       !ref($opt->{reindex}) && $idxlevel ne 'basic') {
                $self->lock_acquire;
                my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0);
                if (my $xdb = $s0->idx_acquire) {
@@ -1400,12 +1409,16 @@ sub index_sync {
        }
 
        # reindex does not pick up new changes, so we rerun w/o it:
-       if ($opt->{reindex}) {
+       if ($opt->{reindex} && !$sync->{quit}) {
                my %again = %$opt;
                $sync = undef;
                delete @again{qw(rethread reindex -skip_lock)};
                index_sync($self, \%again);
+               $opt->{quit} = $again{quit}; # propagate to caller
        }
+       warn <<EOF if $quit_warn;
+W: interrupted, --xapian-only --reindex required upon restart
+EOF
 }
 
 1;
index 55e4f641f89e939481d5d72197beedf1c9bca1ad..5909801e7a3ad760fb714b9f5d397f7d0cf47820 100755 (executable)
@@ -90,6 +90,7 @@ EOL
                $ibx_opt = { %$opt, sequential_shard => $v };
        }
        PublicInbox::Admin::index_inbox($ibx, undef, $ibx_opt);
+       last if $ibx_opt->{quit};
        if (my $copt = $opt->{compact_opt}) {
                local $copt->{jobs} = 0 if $ibx_opt->{sequential_shard};
                PublicInbox::Xapcmd::run($ibx, 'compact', $copt);