]> Sergey Matveev's repositories - public-inbox.git/commitdiff
extindex: --watch for inotify-based updates
authorEric Wong <e@80x24.org>
Sat, 26 Dec 2020 01:44:37 +0000 (01:44 +0000)
committerEric Wong <e@80x24.org>
Sun, 27 Dec 2020 09:30:33 +0000 (09:30 +0000)
This reuses existing InboxIdle infrastructure to update external
indices based on per-inbox updates.  This is an alternative to
auto-updating external indices via the -index command and also
works with existing uses of -mda and public-inbox-watch.

Using inotify (or EVFILT_VNODE) allows watching thousands of
inboxes without having to scan every single one at every
invocation.

This is especially beneficial in cases where an external index
is not writable to the users writing to per-inbox indices.

lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/InboxIdle.pm
lib/PublicInbox/OverIdx.pm
lib/PublicInbox/V2Writable.pm
script/public-inbox-extindex

index 64ebf6db4837ba9899bca11ea5882d4dccfd0e01..53ff2ca183656e5fd1a6dd72b58db9e46096691d 100644 (file)
@@ -630,7 +630,7 @@ sub eidxq_process ($$) { # for reindexing
        my $dbh = $self->{oidx}->dbh;
        my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
        ${$sync->{nr}} = 0;
-       $sync->{-regen_fmt} = "%u/$tot\n";
+       local $sync->{-regen_fmt} = "%u/$tot\n";
        my $pr = $sync->{-opt}->{-progress};
        if ($pr) {
                my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
@@ -709,7 +709,8 @@ sub _reindex_check_unseen ($$$) {
        my $msgs;
        my $pr = $sync->{-opt}->{-progress};
        my $ekey = $ibx->eidx_key;
-       $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+       local $sync->{-regen_fmt} =
+                       "$ekey checking unseen %u/".$ibx->over->max."\n";
        ${$sync->{nr}} = 0;
 
        while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
@@ -752,7 +753,7 @@ sub _reindex_check_stale ($$$) {
        my $pr = $sync->{-opt}->{-progress};
        my $fetching;
        my $ekey = $ibx->eidx_key;
-       $sync->{-regen_fmt} =
+       local $sync->{-regen_fmt} =
                        "$ekey check stale/missing %u/".$ibx->over->max."\n";
        ${$sync->{nr}} = 0;
        do {
@@ -838,6 +839,13 @@ sub eidx_reindex {
        eidxq_process($self, $sync) unless $sync->{quit};
 }
 
+sub sync_inbox {
+       my ($self, $sync, $ibx) = @_;
+       my $err = _sync_inbox($self, $sync, $ibx);
+       delete @$ibx{qw(mm over)};
+       warn $err, "\n" if defined($err);
+}
+
 sub eidx_sync { # main entry point
        my ($self, $opt) = @_;
 
@@ -868,22 +876,21 @@ sub eidx_sync { # main entry point
                $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
        }
        if (delete($opt->{reindex})) {
-               $sync->{checkpoint_unlocks} = 1;
+               local $sync->{checkpoint_unlocks} = 1;
                eidx_reindex($self, $sync);
        }
 
        # don't use $_ here, it'll get clobbered by reindex_checkpoint
        for my $ibx (@{$self->{ibx_list}}) {
                last if $sync->{quit};
-               my $err = _sync_inbox($self, $sync, $ibx);
-               delete @$ibx{qw(mm over)};
-               warn $err, "\n" if defined($err);
+               sync_inbox($self, $sync, $ibx);
        }
        $self->{oidx}->rethread_done($opt) unless $sync->{quit};
        eidxq_process($self, $sync) unless $sync->{quit};
 
        eidxq_release($self);
-       PublicInbox::V2Writable::done($self);
+       done($self);
+       $sync; # for eidx_watch
 }
 
 sub update_last_commit { # overrides V2Writable
@@ -970,6 +977,109 @@ sub idx_init { # similar to V2Writable
        $self->{midx}->begin_txn;
 }
 
+sub _watch_commit { # PublicInbox::DS::add_timer callback
+       my ($self) = @_;
+       delete $self->{-commit_timer};
+       eidxq_process($self, $self->{-watch_sync});
+       eidxq_release($self);
+       delete local $self->{-watch_sync}->{-regen_fmt};
+       reindex_checkpoint($self, $self->{-watch_sync});
+
+       # call event_step => done unless commit_timer is armed
+       PublicInbox::DS::requeue($self);
+}
+
+sub on_inbox_unlock { # called by PublicInbox::InboxIdle
+       my ($self, $ibx) = @_;
+       my $opt = $self->{-watch_sync}->{-opt};
+       my $pr = $opt->{-progress};
+       my $ekey = $ibx->eidx_key;
+       local $0 = "sync $ekey";
+       $pr->("indexing $ekey\n") if $pr;
+       $self->idx_init($opt);
+       sync_inbox($self, $self->{-watch_sync}, $ibx);
+       $self->{-commit_timer} //= PublicInbox::DS::add_timer(
+                                       $opt->{'commit-interval'} // 10,
+                                       \&_watch_commit, $self);
+}
+
+sub eidx_reload { # -extindex --watch SIGHUP handler
+       my ($self, $idler) = @_;
+       if ($self->{cfg}) {
+               my $pr = $self->{-watch_sync}->{-opt}->{-progress};
+               $pr->('reloading ...') if $pr;
+               @{$self->{ibx_list}} = ();
+               %{$self->{ibx_map}} = ();
+               delete $self->{-watch_sync}->{id2pos};
+               my $cfg = PublicInbox::Config->new;
+               attach_config($self, $cfg);
+               $idler->refresh($cfg);
+               $pr->(" done\n") if $pr;
+       } else {
+               warn "reload not supported without --all\n";
+       }
+}
+
+sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
+       my ($self) = @_;
+       $self->{-resync_queue} //= [ @{$self->{ibx_list}} ];
+       PublicInbox::DS::requeue($self); # trigger our ->event_step
+}
+
+sub event_step { # PublicInbox::DS::requeue callback
+       my ($self) = @_;
+       if (my $resync_queue = $self->{-resync_queue}) {
+               if (my $ibx = shift(@$resync_queue)) {
+                       on_inbox_unlock($self, $ibx);
+                       PublicInbox::DS::requeue($self);
+               } else {
+                       delete $self->{-resync_queue};
+                       _watch_commit($self);
+               }
+       } else {
+               done($self) unless $self->{-commit_timer};
+       }
+}
+
+sub eidx_watch { # public-inbox-extindex --watch main loop
+       my ($self, $opt) = @_;
+       require PublicInbox::InboxIdle;
+       require PublicInbox::DS;
+       require PublicInbox::Syscall;
+       require PublicInbox::Sigfd;
+       my $idler = PublicInbox::InboxIdle->new($self->{cfg});
+       if (!$self->{cfg}) {
+               $idler->watch_inbox($_) for @{$self->{ibx_list}};
+       }
+       $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}};
+       my $sync = eidx_sync($self, $opt); # initial sync
+       return if $sync->{quit};
+       my $oldset = PublicInbox::Sigfd::block_signals();
+       local $self->{current_info} = '';
+       my $cb = $SIG{__WARN__} || \&CORE::warn;
+       local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+       my $sig = {
+               HUP => sub { eidx_reload($self, $idler) },
+               USR1 => sub { eidx_resync_start($self) },
+               TSTP => sub { kill('STOP', $$) },
+       };
+       my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+       $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
+       my $sigfd = PublicInbox::Sigfd->new($sig,
+                                       $PublicInbox::Syscall::SFD_NONBLOCK);
+       local %SIG = (%SIG, %$sig) if !$sigfd;
+       local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
+       if (!$sigfd) {
+               # wake up every second to accept signals if we don't
+               # have signalfd or IO::KQueue:
+               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS->SetLoopTimeout(1000);
+       }
+       PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+       PublicInbox::DS->EventLoop; # calls InboxIdle->event_step
+       done($self);
+}
+
 no warnings 'once';
 *done = \&PublicInbox::V2Writable::done;
 *with_umask = \&PublicInbox::InboxWritable::with_umask;
index 508007d7efe1bf743a561980c38b85ba628b2020..35aed696daff61454ba5f6370020b848b6b8fb70 100644 (file)
@@ -63,6 +63,9 @@ sub refresh {
        $pi_cfg->each_inbox(\&in2_arm, $self);
 }
 
+# internal API for ease-of-use
+sub watch_inbox { in2_arm($_[1], $_[0]) };
+
 sub new {
        my ($class, $pi_cfg) = @_;
        my $self = bless {}, $class;
@@ -78,7 +81,7 @@ sub new {
        $self->{inot} = $inot;
        $self->{pathmap} = {}; # inboxdir => [ ibx, watch1, watch2, watch3...]
        $self->{on_unlock} = {}; # lock path => ibx
-       refresh($self, $pi_cfg);
+       refresh($self, $pi_cfg) if $pi_cfg;
        PublicInbox::FakeInotify::poll_once($self) if !$ino_cls;
        $self;
 }
@@ -89,7 +92,8 @@ sub event_step {
                my @events = $self->{inot}->read; # Linux::Inotify2::read
                my $on_unlock = $self->{on_unlock};
                for my $ev (@events) {
-                       if (my $ibx = $on_unlock->{$ev->fullname}) {
+                       my $fn = $ev->fullname // next; # cancelled
+                       if (my $ibx = $on_unlock->{$fn}) {
                                $ibx->on_unlock;
                        }
                }
index 4a39bf5346285fc9dd3c464545cd48e3b6db02c6..dcc2cff34a42128116374669d037384a96f407aa 100644 (file)
@@ -473,10 +473,14 @@ sub dbh_close {
 
 sub create {
        my ($self) = @_;
-       unless (-r $self->{filename}) {
+       my $fn = $self->{filename} // do {
+               Carp::confess('BUG: no {filename}') unless $self->{dbh};
+               return;
+       };
+       unless (-r $fn) {
                require File::Path;
                require File::Basename;
-               File::Path::mkpath(File::Basename::dirname($self->{filename}));
+               File::Path::mkpath(File::Basename::dirname($fn));
        }
        # create the DB:
        PublicInbox::Over::dbh($self);
index f20b5c7f0a75d5f22a0814929f5fc0cd0c3c6734..567582c5ff7979ba74276d23cdc5c9c24649604d 100644 (file)
@@ -879,7 +879,7 @@ sub reindex_checkpoint ($$) {
                $self->done; # release lock
        }
 
-       if (my $pr = $sync->{-opt}->{-progress}) {
+       if (my $pr = $sync->{-regen_fmt} ? $sync->{-opt}->{-progress} : undef) {
                $pr->(sprintf($sync->{-regen_fmt}, ${$sync->{nr}}));
        }
 
index 17ad59fac3e21ba00ed7cdcb5710dec5e5b7734e..607baa3e0ef785b3e8cd31d06f5449a55bc07352 100644 (file)
@@ -11,6 +11,7 @@ usage: public-inbox-extindex [options] EXTINDEX_DIR [INBOX_DIR]
   Create and update external (detached) search indices
 
   --no-fsync          speed up indexing, risk corruption on power outage
+  --watch             run persistently and watch for inbox updates
   -L LEVEL            `medium', or `full' (default: full)
   --all               index all configured inboxes
   --jobs=NUM          set or disable parallelization (NUM=0)
@@ -27,7 +28,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i
                fsync|sync!
                indexlevel|index-level|L=s max_size|max-size=s
                batch_size|batch-size=s
-               gc
+               gc commit-interval=i watch
                all help|h))
        or die $help;
 if ($opt->{help}) { print $help; exit 0 };
@@ -41,7 +42,8 @@ my $cfg = PublicInbox::Config->new;
 my @ibxs;
 if ($opt->{gc}) {
        die "E: inbox paths must not be specified with --gc\n" if @ARGV;
-       die "E: --all not compatible --gc\n" if $opt->{all};
+       die "E: --all not compatible with --gc\n" if $opt->{all};
+       die "E: --watch is not compatible with --gc\n" if $opt->{watch};
 } else {
        @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt, $cfg);
 }
@@ -56,6 +58,15 @@ if ($opt->{gc}) {
        $eidx->attach_config($cfg);
        $eidx->eidx_gc($opt);
 } else {
-       $eidx->attach_inbox($_) for @ibxs;
-       $eidx->eidx_sync($opt);
+       if ($opt->{all}) {
+               $eidx->attach_config($cfg);
+       } else {
+               $eidx->attach_inbox($_) for @ibxs;
+       }
+       if ($opt->{watch}) {
+               $cfg = undef; # save memory only after SIGHUP
+               $eidx->eidx_watch($opt);
+       } else {
+               $eidx->eidx_sync($opt);
+       }
 }