]> Sergey Matveev's repositories - public-inbox.git/commitdiff
extindex: support graceful shutdown via QUIT/INT/TERM
authorEric Wong <e@80x24.org>
Fri, 13 Nov 2020 11:11:44 +0000 (11:11 +0000)
committerEric Wong <e@80x24.org>
Sun, 15 Nov 2020 06:12:43 +0000 (06:12 +0000)
Just like the daemon processes, -extindex now supports graceful
shutdown via the same signals.  This lets users avoid having to
repeat indexing messages when a power outage strikes during a
long (multi-hour/day) indexing run.

Per-inbox (v1/v2) -index graceful shutdowns are not supported,
yet, but is planned for later.

lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/IdxStack.pm
lib/PublicInbox/SearchIdxShard.pm
lib/PublicInbox/V2Writable.pm

index 6c09c460badb1517f881761e1fd01230762db8cc..91434b26d2519f2e7799927cab8a2e74ed21e3ce 100644 (file)
@@ -329,13 +329,18 @@ sub eidx_sync { # main entry point
                -regen_fmt => "%u/?\n",
        };
        local $SIG{USR1} = sub { $need_checkpoint = 1 };
+       my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; };
+       local $SIG{QUIT} = $quit;
+       local $SIG{INT} = $quit;
+       local $SIG{TERM} = $quit;
 
        # don't use $_ here, it'll get clobbered by reindex_checkpoint
        for my $ibx (@{$self->{ibx_list}}) {
                _sync_inbox($self, $sync, $ibx);
+               last if $sync->{quit};
        }
 
-       $self->{oidx}->rethread_done($opt);
+       $self->{oidx}->rethread_done($opt) unless $sync->{quit};
 
        PublicInbox::V2Writable::done($self);
 }
index e7e10de94da8cc3a9aa97322e052c9834a944146..c55c5c36790418a5579abd0f79a1b4075f9020b3 100644 (file)
@@ -11,6 +11,8 @@ use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
 # start off in write-only mode
 sub new {
        open(my $io, '+>', undef) or die "open: $!";
+       # latest_cmt is still useful when the newest revision is a `d'(elete),
+       # otherwise we favor $sync->{latest_cmt} for checkpoints and {quit}
        bless { wr => $io, latest_cmt => $_[1] }, __PACKAGE__
 }
 
index 1333b30529e857b5fb205dc4b24e0c8275b30e66..875a9ec92e01220abc56ddf99b6d2c1feb1597cd 100644 (file)
@@ -10,6 +10,7 @@ use parent qw(PublicInbox::SearchIdx);
 use bytes qw(length);
 use IO::Handle (); # autoflush
 use PublicInbox::Eml;
+use PublicInbox::Sigfd;
 
 sub new {
        my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx
@@ -29,9 +30,13 @@ sub spawn_worker {
        my ($r, $w);
        pipe($r, $w) or die "pipe failed: $!\n";
        $w->autoflush(1);
+       my $oldset = PublicInbox::Sigfd::block_signals();
        my $pid = fork;
        defined $pid or die "fork failed: $!\n";
        if ($pid == 0) {
+               # these signals are localized in parent
+               $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+               PublicInbox::Sigfd::sig_setmask($oldset);
                my $bnote = $v2w->atfork_child;
                close $w or die "failed to close: $!";
 
@@ -44,6 +49,7 @@ sub spawn_worker {
                die "unexpected MM $self->{mm}" if $self->{mm};
                exit;
        }
+       PublicInbox::Sigfd::sig_setmask($oldset);
        $self->{pid} = $pid;
        $self->{w} = $w;
        close $r or die "failed to close: $!";
index 11cde627f92b82a98dcade60767b40e073006753..5bac04a47bb1345138fd5030d12db170e32b55fb 100644 (file)
@@ -1090,6 +1090,7 @@ sub sync_prepare ($$) {
                $unit->{stack} = $stk; # may be undef
                unshift @{$sync->{todo}}, $unit;
                $regen_max += $nr;
+               last if $sync->{quit};
        }
 
        # XXX this should not happen unless somebody bypasses checks in
@@ -1102,9 +1103,11 @@ sub sync_prepare ($$) {
                        $oid = unpack('H*', $oid);
                        my $req = { %$sync, oid => $oid };
                        $self->git->cat_async($oid, $unindex_oid, $req);
+                       last if $sync->{quit};
                }
                $self->git->cat_async_wait;
        }
+       return 0 if $sync->{quit};
        if (!$regen_max) {
                $sync->{-regen_fmt} = "%u/?\n";
                return 0;
@@ -1236,6 +1239,7 @@ sub index_xap_step ($$$;$) {
 
 sub index_todo ($$$) {
        my ($self, $sync, $unit) = @_;
+       return if $sync->{quit};
        unindex_todo($self, $sync, $unit);
        my $stk = delete($unit->{stack}) or return;
        my $all = $self->git;
@@ -1268,6 +1272,12 @@ sub index_todo ($$$) {
                } elsif ($f eq 'd') {
                        $all->cat_async($oid, $unindex_oid, $req);
                }
+               if ($sync->{quit}) {
+                       warn "waiting to quit...\n";
+                       $all->async_wait_all;
+                       $self->update_last_commit($sync);
+                       return;
+               }
                if (${$sync->{need_checkpoint}}) {
                        reindex_checkpoint($self, $sync);
                }
@@ -1334,6 +1344,11 @@ sub index_sync {
                ibx => $self->{ibx},
                epoch_max => $epoch_max,
        };
+       my $quit = sub { $sync->{quit} = 1 };
+       local $SIG{QUIT} = $quit;
+       local $SIG{INT} = $quit;
+       local $SIG{TERM} = $quit;
+
        if (sync_prepare($self, $sync)) {
                # tmp_clone seems to fail if inside a transaction, so
                # we rollback here (because we opened {mm} for reading)
@@ -1352,7 +1367,7 @@ sub index_sync {
        }
        # work forwards through history
        index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
-       $self->{oidx}->rethread_done($opt);
+       $self->{oidx}->rethread_done($opt) unless $sync->{quit};
        $self->done;
 
        if (my $nr = $sync->{nr}) {