X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=blobdiff_plain;f=lib%2FPublicInbox%2FExtSearchIdx.pm;h=a2d70205c9cdb2a7faac5b977ecd3246de9974e2;hp=568960562812e56500ebaee9dc28a03447dd683c;hb=0c8106d44f317175e122744b43407bf067183175;hpb=9d29ceda4eb8c9973749d74b928416f5c3cc78f8 diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 56896056..a2d70205 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -30,13 +30,11 @@ use PublicInbox::V2Writable; use PublicInbox::InboxWritable; use PublicInbox::ContentHash qw(content_hash); use PublicInbox::Eml; -use File::Spec; use PublicInbox::DS qw(now); use DBI qw(:sql_types); # SQL_BLOB sub new { my (undef, $dir, $opt) = @_; - $dir = File::Spec->canonpath($dir); my $l = $opt->{indexlevel} // 'full'; $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and die "invalid indexlevel=$l\n"; @@ -56,28 +54,14 @@ sub new { }, __PACKAGE__; $self->{shards} = $self->count_shards || nproc_shards($opt->{creat}); my $oidx = PublicInbox::OverIdx->new("$self->{xpfx}/over.sqlite3"); - $oidx->{-no_fsync} = 1 if $opt->{-no_fsync}; + $self->{-no_fsync} = $oidx->{-no_fsync} = 1 if !$opt->{fsync}; $self->{oidx} = $oidx; $self } sub attach_inbox { my ($self, $ibx) = @_; - my $key = $ibx->eidx_key; - if (!$ibx->over || !$ibx->mm) { - warn "W: skipping $key (unindexed)\n"; - return; - } - if (!defined($ibx->uidvalidity)) { - warn "W: skipping $key (no UIDVALIDITY)\n"; - return; - } - my $ibxdir = File::Spec->canonpath($ibx->{inboxdir}); - if ($ibxdir ne $ibx->{inboxdir}) { - warn "W: `$ibx->{inboxdir}' canonicalized to `$ibxdir'\n"; - $ibx->{inboxdir} = $ibxdir; - } - $self->{ibx_map}->{$key} //= do { + $self->{ibx_map}->{$ibx->eidx_key} //= do { push @{$self->{ibx_list}}, $ibx; $ibx; } @@ -281,29 +265,36 @@ sub last_commits { $heads; } +sub _ibx_index_reject ($) { + my ($ibx) = @_; + $ibx->mm // return 'unindexed, no msgmap.sqlite3'; + $ibx->uidvalidity // return 'no UIDVALIDITY'; + $ibx->over // return 'unindexed, no over.sqlite3'; + undef; +} + sub _sync_inbox ($$$) { my ($self, $sync, $ibx) = @_; + my $ekey = $ibx->eidx_key; + if (defined(my $err = _ibx_index_reject($ibx))) { + return "W: skipping $ekey ($err)"; + } $sync->{ibx} = $ibx; $sync->{nr} = \(my $nr = 0); my $v = $ibx->version; - my $ekey = $ibx->eidx_key; if ($v == 2) { $sync->{epoch_max} = $ibx->max_git_epoch // return; sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable } elsif ($v == 1) { my $uv = $ibx->uidvalidity; my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv"); - my $head = $ibx->mm->last_commit; - unless (defined $head) { - warn "E: $ibx->{inboxdir} is not indexed\n"; - return; - } + my $head = $ibx->mm->last_commit // + return "E: $ibx->{inboxdir} is not indexed"; my $stk = prepare_stack($sync, $lc ? "$lc..$head" : $head); my $unit = { stack => $stk, git => $ibx->git }; push @{$sync->{todo}}, $unit; } else { - warn "E: $ekey unsupported inbox version (v$v)\n"; - return; + return "E: $ekey unsupported inbox version (v$v)"; } for my $unit (@{delete($sync->{todo}) // []}) { last if $sync->{quit}; @@ -311,6 +302,7 @@ sub _sync_inbox ($$$) { } $self->{midx}->index_ibx($ibx) unless $sync->{quit}; $ibx->git->cleanup; # done with this inbox, now + undef; } sub gc_unref_doc ($$$$) { @@ -401,6 +393,32 @@ sub _ibx_for ($$$) { $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped" } +sub _fd_constrained ($) { + my ($self) = @_; + $self->{-fd_constrained} //= do { + my $soft; + if (eval { require BSD::Resource; 1 }) { + my $NOFILE = BSD::Resource::RLIMIT_NOFILE(); + ($soft, undef) = BSD::Resource::getrlimit($NOFILE); + } else { + chomp($soft = `sh -c 'ulimit -n'`); + } + if (defined($soft)) { + my $want = scalar(@{$self->{ibx_list}}) + 64; # estimate + my $ret = $want > $soft; + if ($ret) { + warn <{sync}; @@ -437,11 +455,16 @@ sub _reindex_finalize ($$$) { my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty"; $x->{num} = delete($x->{xnum}) // die '{xnum} unset'; $ibx = _ibx_for($self, $sync, $x); - my $e = $ibx->over->get_art($x->{num}); - $e->{blob} eq $x->{blob} or die <over) { + my $e = $over->get_art($x->{num}); + $e->{blob} eq $x->{blob} or die <{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num}); EOF - push @todo, $ibx, $e; + push @todo, $ibx, $e; + $over->dbh_close if _fd_constrained($self); + } else { + die "$ibx->{inboxdir}: over.sqlite3 unusable: $!\n"; + } } undef $by_chash; while (my ($ibx, $e) = splice(@todo, 0, 2)) { @@ -607,7 +630,7 @@ sub eidxq_process ($$) { # for reindexing my $dbh = $self->{oidx}->dbh; my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return; ${$sync->{nr}} = 0; - $sync->{-regen_fmt} = "%u/$tot\n"; + local $sync->{-regen_fmt} = "%u/$tot\n"; my $pr = $sync->{-opt}->{-progress}; if ($pr) { my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq'); @@ -686,7 +709,8 @@ sub _reindex_check_unseen ($$$) { my $msgs; my $pr = $sync->{-opt}->{-progress}; my $ekey = $ibx->eidx_key; - $sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n"; + local $sync->{-regen_fmt} = + "$ekey checking unseen %u/".$ibx->over->max."\n"; ${$sync->{nr}} = 0; while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) { @@ -729,7 +753,7 @@ sub _reindex_check_stale ($$$) { my $pr = $sync->{-opt}->{-progress}; my $fetching; my $ekey = $ibx->eidx_key; - $sync->{-regen_fmt} = + local $sync->{-regen_fmt} = "$ekey check stale/missing %u/".$ibx->over->max."\n"; ${$sync->{nr}} = 0; do { @@ -787,9 +811,14 @@ DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? sub _reindex_inbox ($$$) { my ($self, $sync, $ibx) = @_; - local $self->{current_info} = $ibx->eidx_key; - _reindex_check_unseen($self, $sync, $ibx); - _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit}; + my $ekey = $ibx->eidx_key; + local $self->{current_info} = $ekey; + if (defined(my $err = _ibx_index_reject($ibx))) { + warn "W: cannot reindex $ekey ($err)\n"; + } else { + _reindex_check_unseen($self, $sync, $ibx); + _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit}; + } delete @$ibx{qw(over mm search git)}; # won't need these for a bit } @@ -810,10 +839,17 @@ sub eidx_reindex { eidxq_process($self, $sync) unless $sync->{quit}; } +sub sync_inbox { + my ($self, $sync, $ibx) = @_; + my $err = _sync_inbox($self, $sync, $ibx); + delete @$ibx{qw(mm over)}; + warn $err, "\n" if defined($err); +} + sub eidx_sync { # main entry point my ($self, $opt) = @_; - my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; + my $warn_cb = $SIG{__WARN__} || \&CORE::warn; local $self->{current_info} = ''; local $SIG{__WARN__} = sub { $warn_cb->($self->{current_info}, ': ', @_); @@ -840,20 +876,23 @@ sub eidx_sync { # main entry point $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key); } if (delete($opt->{reindex})) { - $sync->{checkpoint_unlocks} = 1; + local $sync->{checkpoint_unlocks} = 1; eidx_reindex($self, $sync); } # don't use $_ here, it'll get clobbered by reindex_checkpoint - for my $ibx (@{$self->{ibx_list}}) { - last if $sync->{quit}; - _sync_inbox($self, $sync, $ibx); + if ($opt->{scan} // 1) { + for my $ibx (@{$self->{ibx_list}}) { + last if $sync->{quit}; + sync_inbox($self, $sync, $ibx); + } } $self->{oidx}->rethread_done($opt) unless $sync->{quit}; eidxq_process($self, $sync) unless $sync->{quit}; eidxq_release($self); - PublicInbox::V2Writable::done($self); + done($self); + $sync; # for eidx_watch } sub update_last_commit { # overrides V2Writable @@ -963,16 +1002,125 @@ sub idx_init { # similar to V2Writable PublicInbox::V2Writable::write_alternates($info_dir, $mode, $o); } $self->parallel_init($self->{indexlevel}); - $self->umask_prepare; $self->with_umask(\&_idx_init, $self, $opt); $self->{oidx}->begin_lazy; $self->{oidx}->eidx_prep; $self->{midx}->begin_txn; } +sub _watch_commit { # PublicInbox::DS::add_timer callback + my ($self) = @_; + delete $self->{-commit_timer}; + eidxq_process($self, $self->{-watch_sync}); + eidxq_release($self); + delete local $self->{-watch_sync}->{-regen_fmt}; + reindex_checkpoint($self, $self->{-watch_sync}); + + # call event_step => done unless commit_timer is armed + PublicInbox::DS::requeue($self); +} + +sub on_inbox_unlock { # called by PublicInbox::InboxIdle + my ($self, $ibx) = @_; + my $opt = $self->{-watch_sync}->{-opt}; + my $pr = $opt->{-progress}; + my $ekey = $ibx->eidx_key; + local $0 = "sync $ekey"; + $pr->("indexing $ekey\n") if $pr; + $self->idx_init($opt); + sync_inbox($self, $self->{-watch_sync}, $ibx); + $self->{-commit_timer} //= PublicInbox::DS::add_timer( + $opt->{'commit-interval'} // 10, + \&_watch_commit, $self); +} + +sub eidx_reload { # -extindex --watch SIGHUP handler + my ($self, $idler) = @_; + if ($self->{cfg}) { + my $pr = $self->{-watch_sync}->{-opt}->{-progress}; + $pr->('reloading ...') if $pr; + delete $self->{-resync_queue}; + @{$self->{ibx_list}} = (); + %{$self->{ibx_map}} = (); + delete $self->{-watch_sync}->{id2pos}; + my $cfg = PublicInbox::Config->new; + attach_config($self, $cfg); + $idler->refresh($cfg); + $pr->(" done\n") if $pr; + } else { + warn "reload not supported without --all\n"; + } +} + +sub eidx_resync_start ($) { # -extindex --watch SIGUSR1 handler + my ($self) = @_; + $self->{-resync_queue} //= [ @{$self->{ibx_list}} ]; + PublicInbox::DS::requeue($self); # trigger our ->event_step +} + +sub event_step { # PublicInbox::DS::requeue callback + my ($self) = @_; + if (my $resync_queue = $self->{-resync_queue}) { + if (my $ibx = shift(@$resync_queue)) { + on_inbox_unlock($self, $ibx); + PublicInbox::DS::requeue($self); + } else { + delete $self->{-resync_queue}; + _watch_commit($self); + } + } else { + done($self) unless $self->{-commit_timer}; + } +} + +sub eidx_watch { # public-inbox-extindex --watch main loop + my ($self, $opt) = @_; + local %SIG = %SIG; + for my $sig (qw(HUP USR1 TSTP QUIT INT TERM)) { + $SIG{$sig} = sub { warn "SIG$sig ignored while scanning\n" }; + } + require PublicInbox::InboxIdle; + require PublicInbox::DS; + require PublicInbox::Syscall; + require PublicInbox::Sigfd; + my $idler = PublicInbox::InboxIdle->new($self->{cfg}); + if (!$self->{cfg}) { + $idler->watch_inbox($_) for @{$self->{ibx_list}}; + } + $_->subscribe_unlock(__PACKAGE__, $self) for @{$self->{ibx_list}}; + my $pr = $opt->{-progress}; + $pr->("performing initial scan ...\n") if $pr; + my $sync = eidx_sync($self, $opt); # initial sync + return if $sync->{quit}; + my $oldset = PublicInbox::Sigfd::block_signals(); + local $self->{current_info} = ''; + my $cb = $SIG{__WARN__} || \&CORE::warn; + local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) }; + my $sig = { + HUP => sub { eidx_reload($self, $idler) }, + USR1 => sub { eidx_resync_start($self) }, + TSTP => sub { kill('STOP', $$) }, + }; + my $quit = PublicInbox::SearchIdx::quit_cb($sync); + $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit; + my $sigfd = PublicInbox::Sigfd->new($sig, + $PublicInbox::Syscall::SFD_NONBLOCK); + %SIG = (%SIG, %$sig) if !$sigfd; + local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock + if (!$sigfd) { + # wake up every second to accept signals if we don't + # have signalfd or IO::KQueue: + PublicInbox::Sigfd::sig_setmask($oldset); + PublicInbox::DS->SetLoopTimeout(1000); + } + PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} }); + $pr->("initial scan complete, entering event loop\n") if $pr; + PublicInbox::DS->EventLoop; # calls InboxIdle->event_step + done($self); +} + no warnings 'once'; *done = \&PublicInbox::V2Writable::done; -*umask_prepare = \&PublicInbox::InboxWritable::umask_prepare; *with_umask = \&PublicInbox::InboxWritable::with_umask; *parallel_init = \&PublicInbox::V2Writable::parallel_init; *nproc_shards = \&PublicInbox::V2Writable::nproc_shards;