my $dbh = $self->{oidx}->dbh;
my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
${$sync->{nr}} = 0;
- $sync->{-regen_fmt} = "%u/$tot\n";
+ local $sync->{-regen_fmt} = "%u/$tot\n";
my $pr = $sync->{-opt}->{-progress};
if ($pr) {
my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
my $msgs;
my $pr = $sync->{-opt}->{-progress};
my $ekey = $ibx->eidx_key;
- $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+ local $sync->{-regen_fmt} =
+ "$ekey checking unseen %u/".$ibx->over->max."\n";
${$sync->{nr}} = 0;
while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
my $pr = $sync->{-opt}->{-progress};
my $fetching;
my $ekey = $ibx->eidx_key;
- $sync->{-regen_fmt} =
+ local $sync->{-regen_fmt} =
"$ekey check stale/missing %u/".$ibx->over->max."\n";
${$sync->{nr}} = 0;
do {
eidxq_process($self, $sync) unless $sync->{quit};
}
+sub sync_inbox {
+ my ($self, $sync, $ibx) = @_;
+ my $err = _sync_inbox($self, $sync, $ibx);
+ delete @$ibx{qw(mm over)};
+ warn $err, "\n" if defined($err);
+}
+
sub eidx_sync { # main entry point
my ($self, $opt) = @_;
$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
}
if (delete($opt->{reindex})) {
- $sync->{checkpoint_unlocks} = 1;
+ local $sync->{checkpoint_unlocks} = 1;
eidx_reindex($self, $sync);
}
# don't use $_ here, it'll get clobbered by reindex_checkpoint
for my $ibx (@{$self->{ibx_list}}) {
last if $sync->{quit};
- my $err = _sync_inbox($self, $sync, $ibx);
- delete @$ibx{qw(mm over)};
- warn $err, "\n" if defined($err);
+ sync_inbox($self, $sync, $ibx);
}
$self->{oidx}->rethread_done($opt) unless $sync->{quit};
eidxq_process($self, $sync) unless $sync->{quit};
eidxq_release($self);
- PublicInbox::V2Writable::done($self);
+ done($self);
+ $sync; # for eidx_watch
}
sub update_last_commit { # overrides V2Writable
$self->{midx}->begin_txn;
}
+sub _watch_commit { # PublicInbox::DS::add_timer callback
+ my ($self) = @_;
+ delete $self->{-commit_timer};
+ eidxq_process($self, $self->{-watch_sync});
+ eidxq_release($self);
+ delete local $self->{-watch_sync}->{-regen_fmt};
+ reindex_checkpoint($self, $self->{-watch_sync});
+
+ # call event_step => done unless commit_timer is armed
+ PublicInbox::DS::requeue($self);
+}
+
+sub on_inbox_unlock { # called by PublicInbox::InboxIdle
+ my ($self, $ibx) = @_;
+ my $opt = $self->{-watch_sync}->{-opt};
+ my $pr = $opt->{-progress};
+ my $ekey = $ibx->eidx_key;
+ local $0 = "sync $ekey";
+ $pr->("indexing $ekey\n") if $pr;
+ $self->idx_init($opt);
+ sync_inbox($self, $self->{-watch_sync}, $ibx);
+ $self->{-commit_timer} //= PublicInbox::DS::add_timer(
+ $opt->{'commit-interval'} // 10,
+ \&_watch_commit, $self);
+}
+
+sub eidx_reload { # -extindex --watch SIGHUP handler
+ my ($self, $idler) = @_;
+ if ($self->{cfg}) {
+ my $pr = $self->{-watch_sync}->{-opt}->{-progress};
+ $pr->('reloading ...') if $pr;
+ @{$self->{ibx_list}} = ();
+ %{$self->{ibx_map}} = ();
+ delete $self->{-watch_sync}->{id2pos};
+ my $cfg = PublicInbox::Config->new;
+ attach_config($self, $cfg);
+ $idler->refresh($cfg);
+ $pr->(" done\n") if $pr;
+ } else {
+ warn "reload not supported without --all\n";
+ }
+}
+
+sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler
+ my ($self) = @_;
+ $self->{-resync_queue} //= [ @{$self->{ibx_list}} ];
+ PublicInbox::DS::requeue($self); # trigger our ->event_step
+}
+
+sub event_step { # PublicInbox::DS::requeue callback
+ my ($self) = @_;
+ if (my $resync_queue = $self->{-resync_queue}) {
+ if (my $ibx = shift(@$resync_queue)) {
+ on_inbox_unlock($self, $ibx);
+ PublicInbox::DS::requeue($self);
+ } else {
+ delete $self->{-resync_queue};
+ _watch_commit($self);
+ }
+ } else {
+ done($self) unless $self->{-commit_timer};
+ }
+}
+
+sub eidx_watch { # public-inbox-extindex --watch main loop
+ my ($self, $opt) = @_;
+ require PublicInbox::InboxIdle;
+ require PublicInbox::DS;
+ require PublicInbox::Syscall;
+ require PublicInbox::Sigfd;
+ my $idler = PublicInbox::InboxIdle->new($self->{cfg});
+ if (!$self->{cfg}) {
+ $idler->watch_inbox($_) for @{$self->{ibx_list}};
+ }
+ $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}};
+ my $sync = eidx_sync($self, $opt); # initial sync
+ return if $sync->{quit};
+ my $oldset = PublicInbox::Sigfd::block_signals();
+ local $self->{current_info} = '';
+ my $cb = $SIG{__WARN__} || \&CORE::warn;
+ local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+ my $sig = {
+ HUP => sub { eidx_reload($self, $idler) },
+ USR1 => sub { eidx_resync_start($self) },
+ TSTP => sub { kill('STOP', $$) },
+ };
+ my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+ $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
+ my $sigfd = PublicInbox::Sigfd->new($sig,
+ $PublicInbox::Syscall::SFD_NONBLOCK);
+ local %SIG = (%SIG, %$sig) if !$sigfd;
+ local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
+ if (!$sigfd) {
+ # wake up every second to accept signals if we don't
+ # have signalfd or IO::KQueue:
+ PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS->SetLoopTimeout(1000);
+ }
+ PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+ PublicInbox::DS->EventLoop; # calls InboxIdle->event_step
+ done($self);
+}
+
no warnings 'once';
*done = \&PublicInbox::V2Writable::done;
*with_umask = \&PublicInbox::InboxWritable::with_umask;
Create and update external (detached) search indices
--no-fsync speed up indexing, risk corruption on power outage
+ --watch run persistently and watch for inbox updates
-L LEVEL `medium', or `full' (default: full)
--all index all configured inboxes
--jobs=NUM set or disable parallelization (NUM=0)
fsync|sync!
indexlevel|index-level|L=s max_size|max-size=s
batch_size|batch-size=s
- gc
+ gc commit-interval=i watch
all help|h))
or die $help;
if ($opt->{help}) { print $help; exit 0 };
my @ibxs;
if ($opt->{gc}) {
die "E: inbox paths must not be specified with --gc\n" if @ARGV;
- die "E: --all not compatible --gc\n" if $opt->{all};
+ die "E: --all not compatible with --gc\n" if $opt->{all};
+ die "E: --watch is not compatible with --gc\n" if $opt->{watch};
} else {
@ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt, $cfg);
}
$eidx->attach_config($cfg);
$eidx->eidx_gc($opt);
} else {
- $eidx->attach_inbox($_) for @ibxs;
- $eidx->eidx_sync($opt);
+ if ($opt->{all}) {
+ $eidx->attach_config($cfg);
+ } else {
+ $eidx->attach_inbox($_) for @ibxs;
+ }
+ if ($opt->{watch}) {
+ $cfg = undef; # save memory only after SIGHUP
+ $eidx->eidx_watch($opt);
+ } else {
+ $eidx->eidx_sync($opt);
+ }
}