X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FExtSearchIdx.pm;h=7cc8dd952559d17b3c5c600a7ac8155547ad34ce;hb=89193578d21f;hp=6b29789a2ed89d33d12e654d6a95f9bbe65cc8fa;hpb=28d5a8d647e3ab56cc5570af0d6f3ccf75dc91f2;p=public-inbox.git diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 6b29789a..7cc8dd95 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -89,6 +89,7 @@ sub attach_config { $self->{ibx_map}->{$ibx->eidx_key} //= do { push @{$self->{ibx_active}}, $ibx; push @{$self->{ibx_known}}, $ibx; + $ibx; } } # invalidate cache @@ -119,9 +120,8 @@ sub apply_boost ($$) { || $a->[1] <=> $b->[1] # break ties with {xnum} } @$xr3; - my $top_blob = unpack('H*', $xr3->[0]->[2]); my $new_smsg = $req->{new_smsg}; - return if $top_blob ne $new_smsg->{blob}; # loser + return if $xr3->[0]->[2] ne pack('H*', $new_smsg->{blob}); # loser # replace the old smsg with the more boosted one $new_smsg->{num} = $smsg->{num}; @@ -222,13 +222,16 @@ sub _blob_missing ($$) { # called when $smsg->{blob} is bad my $xref3 = $self->{oidx}->get_xref3($smsg->{num}); my @keep = grep(!/:$smsg->{blob}\z/, @$xref3); if (@keep) { + warn "E: $smsg->{blob} gone, removing #$smsg->{num}\n"; $keep[0] =~ /:([a-f0-9]{40,}+)\z/ or die "BUG: xref $keep[0] has no OID"; my $oidhex = $1; $self->{oidx}->remove_xref3($smsg->{num}, $smsg->{blob}); - my $upd = $self->{oidx}->update_blob($smsg, $oidhex); - my $saved = $self->{oidx}->get_art($smsg->{num}); + $self->{oidx}->update_blob($smsg, $oidhex) or warn <{num} gone ($smsg->{blob} => $oidhex) +EOM } else { + warn "E: $smsg->{blob} gone, removing #$smsg->{num}\n"; $self->{oidx}->delete_by_num($smsg->{num}); } } @@ -366,19 +369,16 @@ SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ? } } -sub eidx_gc { - my ($self, $opt) = @_; - $self->{cfg} or die "E: GC requires ->attach_config\n"; - $opt->{-idx_gc} = 1; - $self->idx_init($opt); # acquire lock via V2Writable::_idx_init - - my $dbh = $self->{oidx}->dbh; - $dbh->do('PRAGMA case_sensitive_like = ON'); # only place we use LIKE - my $x3_doc = $dbh->prepare('SELECT docid FROM xref3 WHERE ibx_id = ?'); - my $ibx_ck = $dbh->prepare('SELECT ibx_id,eidx_key FROM inboxes'); - my $lc_i = $dbh->prepare(<<''); -SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ? - +sub eidx_gc_scan_inboxes ($$) { + my ($self, $sync) = @_; + my ($x3_doc, $ibx_ck); +restart: + $x3_doc = $self->{oidx}->dbh->prepare(<{oidx}->dbh->prepare(<execute; while (my ($ibx_id, $eidx_key) = $ibx_ck->fetchrow_array) { next if $self->{ibx_map}->{$eidx_key}; @@ -387,44 +387,84 @@ SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ? $x3_doc->execute($ibx_id); while (defined(my $docid = $x3_doc->fetchrow_array)) { gc_unref_doc($self, $ibx_id, $eidx_key, $docid); + if (checkpoint_due($sync)) { + $x3_doc = $ibx_ck = undef; + reindex_checkpoint($self, $sync); + goto restart; + } } - $dbh->prepare_cached(<<'')->execute($ibx_id); + $self->{oidx}->dbh->do(<<'', undef, $ibx_id); DELETE FROM inboxes WHERE ibx_id = ? # drop last_commit info my $pat = $eidx_key; $pat =~ s/([_%\\])/\\$1/g; + $self->{oidx}->dbh->do('PRAGMA case_sensitive_like = ON'); + my $lc_i = $self->{oidx}->dbh->prepare(<<''); +SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ? + $lc_i->execute("lc-%:$pat//%", '\\'); while (my ($key) = $lc_i->fetchrow_array) { next if $key !~ m!\Alc-v[1-9]+:\Q$eidx_key\E//!; warn "I: removing $key\n"; - $dbh->prepare_cached(<<'')->execute($key); + $self->{oidx}->dbh->do(<<'', undef, $key); DELETE FROM eidx_meta WHERE key = ? } - warn "I: $eidx_key removed\n"; } +} - # it's not real unless it's in `over', we use parallelism here, - # shards will be reading directly from over, so commit - $self->{oidx}->commit_lazy; - $self->{oidx}->begin_lazy; - - for my $idx (@{$self->{idx_shards}}) { - warn "I: cleaning up shard #$idx->{shard}\n"; - $idx->shard_over_check($self->{oidx}); - } - my $nr = $dbh->do(<<''); +sub eidx_gc_scan_shards ($$) { # TODO: use for lei/store + my ($self, $sync) = @_; + my $nr = $self->{oidx}->dbh->do(<<''); DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) warn "I: eliminated $nr stale xref3 entries\n" if $nr != 0; # fixup from old bugs: - $nr = $dbh->do(<<''); + $nr = $self->{oidx}->dbh->do(<<''); DELETE FROM over WHERE num NOT IN (SELECT docid FROM xref3) warn "I: eliminated $nr stale over entries\n" if $nr != 0; + + my ($cur) = $self->{oidx}->dbh->selectrow_array(<{oidx}->dbh->selectrow_array(<{oidx}->dbh->prepare(<execute($cur); + next if $exists->fetchrow_array != 0; + $self->idx_shard($cur)->ipc_do('xdb_remove_quiet', $cur); + if (checkpoint_due($sync)) { + $exists = undef; + reindex_checkpoint($self, $sync); + goto restart; + } + } +} + +sub eidx_gc { + my ($self, $opt) = @_; + $self->{cfg} or die "E: GC requires ->attach_config\n"; + $opt->{-idx_gc} = 1; + my $sync = { + need_checkpoint => \(my $need_checkpoint = 0), + check_intvl => 10, + next_check => now() + 10, + checkpoint_unlocks => 1, + -opt => $opt, + }; + $self->idx_init($opt); # acquire lock via V2Writable::_idx_init + eidx_gc_scan_inboxes($self, $sync); + eidx_gc_scan_shards($self, $sync); done($self); } @@ -688,6 +728,7 @@ sub prep_id2pos ($) { sub eidxq_process ($$) { # for reindexing my ($self, $sync) = @_; + return unless $self->{cfg}; return unless eidxq_lock_acquire($self); my $dbh = $self->{oidx}->dbh; @@ -882,6 +923,7 @@ sub _reindex_inbox ($$$) { sub eidx_reindex { my ($self, $sync) = @_; + return unless $self->{cfg}; # acquire eidxq_lock early because full reindex takes forever # and incremental -extindex processes can run during our checkpoints @@ -1305,19 +1347,11 @@ sub eidx_watch { # public-inbox-extindex --watch main loop }; my $quit = PublicInbox::SearchIdx::quit_cb($sync); $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit; - my $sigfd = PublicInbox::Sigfd->new($sig, - $PublicInbox::Syscall::SFD_NONBLOCK); - @SIG{keys %$sig} = values(%$sig) if !$sigfd; local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock - if (!$sigfd) { - # wake up every second to accept signals if we don't - # have signalfd or IO::KQueue: - PublicInbox::DS::sig_setmask($oldset); - PublicInbox::DS->SetLoopTimeout(1000); - } PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} }); $pr->("initial scan complete, entering event loop\n") if $pr; - PublicInbox::DS->EventLoop; # calls InboxIdle->event_step + # calls InboxIdle->event_step: + PublicInbox::DS::event_loop($sig, $oldset); done($self); }