X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FExtSearchIdx.pm;h=7c44a1a406308604c1319b91eaa7130a0be645d2;hb=3eec2f7792040f75f3988c520f308e2445baf645;hp=c0fd282358f97f6c8368acf9536f1d43c9282371;hpb=eb4e8d1d90077ddf49a920ac9719b7787bcfd7a0;p=public-inbox.git diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index c0fd2823..7c44a1a4 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # Detached/external index cross inbox search indexing support @@ -18,6 +18,7 @@ use strict; use v5.10.1; use parent qw(PublicInbox::ExtSearch PublicInbox::Lock); use Carp qw(croak carp); +use Scalar::Util qw(blessed); use Sys::Hostname qw(hostname); use POSIX qw(strftime); use File::Glob qw(bsd_glob GLOB_NOSORT); @@ -58,6 +59,7 @@ sub new { nproc_shards({ nproc => $opt->{jobs} }); my $oidx = PublicInbox::OverIdx->new("$self->{xpfx}/over.sqlite3"); $self->{-no_fsync} = $oidx->{-no_fsync} = 1 if !$opt->{fsync}; + $self->{-dangerous} = 1 if $opt->{dangerous}; $self->{oidx} = $oidx; $self } @@ -121,7 +123,7 @@ sub apply_boost ($$) { $a->[1] <=> $b->[1] # break ties with {xnum} } @$xr3; my $new_smsg = $req->{new_smsg}; - return if $xr3->[0]->[2] ne pack('H*', $new_smsg->{blob}); # loser + return if $xr3->[0]->[2] ne $new_smsg->oidbin; # loser # replace the old smsg with the more boosted one $new_smsg->{num} = $smsg->{num}; @@ -129,25 +131,59 @@ sub apply_boost ($$) { $req->{self}->{oidx}->add_overview($req->{eml}, $new_smsg); } +sub remove_doc ($$) { + my ($self, $docid) = @_; + $self->{oidx}->delete_by_num($docid); + $self->{oidx}->eidxq_del($docid); + $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); +} + sub _unref_doc ($$$$$;$) { my ($sync, $docid, $ibx, $xnum, $oidbin, $eml) = @_; - my $s = 'DELETE FROM xref3 WHERE ibx_id = ? AND oidbin = ?'; + my $smsg; + if (ref($docid)) { + $smsg = $docid; + $docid = $smsg->{num}; + } + if (defined($oidbin) && defined($xnum) && blessed($ibx) && $ibx->over) { + my $smsg = $ibx->over->get_art($xnum); + if ($smsg && $smsg->oidbin eq $oidbin) { + carp("BUG: (non-fatal) ".$ibx->eidx_key. + " #$xnum $smsg->{blob} still valid"); + return; + } + } + my $s = 'DELETE FROM xref3 WHERE oidbin = ?'; + $s .= ' AND ibx_id = ?' if defined($ibx); $s .= ' AND xnum = ?' if defined($xnum); my $del = $sync->{self}->{oidx}->dbh->prepare_cached($s); - $del->bind_param(1, $ibx->{-ibx_id}); - $del->bind_param(2, $oidbin, SQL_BLOB); - $del->bind_param(3, $xnum) if defined($xnum); + my $col = 0; + $del->bind_param(++$col, $oidbin, SQL_BLOB); + $del->bind_param(++$col, $ibx->{-ibx_id}) if $ibx; + $del->bind_param(++$col, $xnum) if defined($xnum); $del->execute; - my $xr3 = $sync->{self}->{oidx}->get_xref3($docid, 1); - my $idx = $sync->{self}->idx_shard($docid); + my $xr3 = $sync->{self}->{oidx}->get_xref3($docid); if (scalar(@$xr3) == 0) { # all gone - $sync->{self}->{oidx}->delete_by_num($docid); - $sync->{self}->{oidx}->eidxq_del($docid); - $idx->ipc_do('xdb_remove', $docid); + remove_doc($sync->{self}, $docid); } else { # enqueue for reindex of remaining messages - my $ekey = $ibx->{-gc_eidx_key} // $ibx->eidx_key; - $idx->ipc_do('remove_eidx_info', $docid, $ekey, $eml); - $sync->{self}->{oidx}->eidxq_add($docid); # yes, add + if ($ibx) { + my $ekey = $ibx->{-gc_eidx_key} // $ibx->eidx_key; + my $idx = $sync->{self}->idx_shard($docid); + $idx->ipc_do('remove_eidx_info', $docid, $ekey, $eml); + } # else: we can't remove_eidx_info in reindex-only path + + # replace invalidated blob ASAP with something which should be + # readable since we may commit the transaction on checkpoint. + # eidxq processing will re-apply boost + $smsg //= $sync->{self}->{oidx}->get_art($docid); + my $hex = unpack('H*', $oidbin); + if ($smsg && $smsg->{blob} eq $hex) { + $xr3->[0] =~ /:([a-f0-9]{40,}+)\z/ or + die "BUG: xref $xr3->[0] has no OID"; + $sync->{self}->{oidx}->update_blob($smsg, $1); + } + # yes, add, we'll need to re-apply boost + $sync->{self}->{oidx}->eidxq_add($docid); } @$xr3 } @@ -167,6 +203,7 @@ sub do_xpost ($$) { $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml); apply_boost($req, $smsg) if $req->{boost_in_use}; } else { # 'd' no {xnum} + $self->git->async_wait_all; $oid = pack('H*', $oid); _unref_doc($req, $docid, $xibx, undef, $oid, $eml); } @@ -230,24 +267,12 @@ sub do_step ($) { # main iterator for adding messages to the index do_finalize($req); } -sub _blob_missing ($$) { # called when $smsg->{blob} is bad +sub _blob_missing ($$) { # called when a known $smsg->{blob} is gone my ($req, $smsg) = @_; - my $self = $req->{self}; - my $xref3 = $self->{oidx}->get_xref3($smsg->{num}); - my @keep = grep(!/:$smsg->{blob}\z/, @$xref3); - if (@keep) { - warn "E: $smsg->{blob} gone, removing #$smsg->{num}\n"; - $keep[0] =~ /:([a-f0-9]{40,}+)\z/ or - die "BUG: xref $keep[0] has no OID"; - my $oidhex = $1; - $self->{oidx}->remove_xref3($smsg->{num}, $smsg->{blob}); - $self->{oidx}->update_blob($smsg, $oidhex) or warn <{num} gone ($smsg->{blob} => $oidhex) -EOM - } else { - warn "E: $smsg->{blob} gone, removing #$smsg->{num}\n"; - $self->{oidx}->delete_by_num($smsg->{num}); - } + # xnum and ibx are unknown, we only call this when an entry from + # /ei*/over.sqlite3 is bad, not on entries from xap*/over.sqlite3 + $req->{self}->git->async_wait_all; + _unref_doc($req, $smsg, undef, undef, $smsg->oidbin); } sub ck_existing { # git->cat_async callback @@ -268,8 +293,8 @@ sub ck_existing { # git->cat_async callback # is the messages visible in the inbox currently being indexed? # return the number if so -sub cur_ibx_xnum ($$) { - my ($req, $bref) = @_; +sub cur_ibx_xnum ($$;$) { + my ($req, $bref, $mismatch) = @_; my $ibx = $req->{ibx} or die 'BUG: current {ibx} missing'; $req->{eml} = PublicInbox::Eml->new($bref); @@ -279,6 +304,7 @@ sub cur_ibx_xnum ($$) { my ($id, $prev); while (my $x = $ibx->over->next_by_mid($mid, \$id, \$prev)) { return $x->{num} if $x->{blob} eq $req->{oid}; + push @$mismatch, $x if $mismatch; } } undef; @@ -293,8 +319,15 @@ sub index_oid { # git->cat_async callback for 'm' blob => $oid, }, 'PublicInbox::Smsg'; $new_smsg->set_bytes($$bref, $size); - defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return; ++${$req->{nr}}; + my $mismatch = []; + $req->{xnum} = cur_ibx_xnum($req, $bref, $mismatch) // do { + warn "# deleted\n"; + warn "# mismatch $_->{blob}\n" for @$mismatch; + ${$req->{latest_cmt}} = $req->{cur_cmt} // + die "BUG: {cur_cmt} unset ($oid)\n"; + return; + }; do_step($req); } @@ -424,6 +457,12 @@ DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3) warn "I: eliminated $nr stale over entries\n" if $nr != 0; reindex_checkpoint($self, $sync) if checkpoint_due($sync); + $nr = $self->{oidx}->dbh->do(<<''); +DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over) + + warn "I: eliminated $nr stale reindex queue entries\n" if $nr != 0; + reindex_checkpoint($self, $sync) if checkpoint_due($sync); + my ($cur) = $self->{oidx}->dbh->selectrow_array(< 0 EOM @@ -532,12 +571,14 @@ sub _reindex_finalize ($$$) { } return if $nr == 1; # likely, all good + $self->git->async_wait_all; warn "W: #$docid split into $nr due to deduplication change\n"; my @todo; for my $ary (values %$by_chash) { for my $x (reverse @$ary) { warn "removing #$docid xref3 $x->{blob}\n"; - my $n = $self->{oidx}->remove_xref3($docid, $x->{blob}); + my $bin = $x->oidbin; + my $n = _unref_doc($sync, $docid, undef, undef, $bin); die "BUG: $x->{blob} invalidated #$docid" if $n == 0; } my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty"; @@ -568,15 +609,14 @@ sub _reindex_oid { # git->cat_async callback my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2]; my $docid = $orig_smsg->{num}; if (is_bad_blob($oid, $type, $size, $expect_oid)) { - my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid); + my $oidbin = pack('H*', $expect_oid); + my $remain = _unref_doc($sync, $docid, undef, undef, $oidbin); if ($remain == 0) { - warn "W: #$docid gone or corrupted\n"; - $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); + warn "W: #$docid ($oid) gone or corrupt\n"; } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) { $self->git->cat_async($next_oid, \&_reindex_oid, $req); } else { - warn "BUG: #$docid gone (UNEXPECTED)\n"; - $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); + warn "BUG: #$docid ($oid) gone (UNEXPECTED)\n"; } return; } @@ -609,8 +649,7 @@ sub _reindex_smsg ($$$) { warn <<""; BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex - $self->{oidx}->delete_by_num($docid); - $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); + remove_doc($self, $docid); return; } @@ -689,11 +728,12 @@ sub eidxq_lock_acquire ($) { return $locked if $locked eq $cur; } my ($pid, $time, $euid, $ident) = split(/-/, $cur, 4); - my $t = strftime('%Y-%m-%d %k:%M:%S', gmtime($time)); + my $t = strftime('%Y-%m-%d %k:%M %z', localtime($time)); + local $self->{current_info} = 'eidxq'; if ($euid == $> && $ident eq host_ident) { if (kill(0, $pid)) { warn <{cfg}; - - return unless eidxq_lock_acquire($self); + local $self->{current_info} = 'eidxq process'; + return unless ($self->{cfg} && eidxq_lock_acquire($self)); my $dbh = $self->{oidx}->dbh; my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return; ${$sync->{nr}} = 0; @@ -810,7 +849,7 @@ sub _unref_stale_range ($$$) { $r = $sync->{self}->{oidx}->dbh->selectall_arrayref( <{-ibx_id}); SELECT docid,xnum,oidbin FROM xref3 -WHERE ibx_id = ? AND xnum $lt_or_gt LIMIT $lim +WHERE ibx_id = ? AND $lt_or_gt LIMIT $lim EOS return if $sync->{quit}; for (@$r) { # hopefully rare, not worth optimizing: @@ -829,18 +868,24 @@ sub _reindex_check_ibx ($$$) { my $slice = 10000; my $opt = { limit => $slice }; my ($beg, $end) = (1, $slice); - my $err = sync_inbox($self, $sync, $ibx) and return; - my $max = $ibx->over->max; + my $ekey = $ibx->eidx_key; + my ($max, $max0); + do { + $max0 = $ibx->mm->num_highwater; + sync_inbox($self, $sync, $ibx) and return; # warned + $max = $ibx->mm->num_highwater; + return if $sync->{quit}; + } while ($max > $max0 && + warn("# $ekey moved $max0..$max, resyncing..\n")); $end = $max if $end > $max; # first, check if we missed any messages in target $ibx my $msgs; my $pr = $sync->{-opt}->{-progress}; - my $ekey = $ibx->eidx_key; local $sync->{-regen_fmt} = "$ekey checking %u/$max\n"; ${$sync->{nr}} = 0; my $fast = $sync->{-opt}->{fast}; - my $dsu; # _unref_stale_range (< $lo) called + my $usr; # _unref_stale_range (< $lo) called my ($lo, $hi); while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end, $opt)})) { ${$sync->{nr}} = $beg; @@ -851,7 +896,7 @@ sub _reindex_check_ibx ($$$) { reindex_checkpoint($self, $sync); # release lock } ($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num}); - $dsu //= _unref_stale_range($sync, $ibx, "< $lo"); + $usr //= _unref_stale_range($sync, $ibx, "xnum < $lo"); my $x3a = $self->{oidx}->dbh->selectall_arrayref( <<"", undef, $ibx_id, $lo, $hi); SELECT xnum,oidbin,docid FROM xref3 WHERE @@ -872,11 +917,11 @@ ibx_id = ? AND xnum >= ? AND xnum <= ? for my $num (@$docids) { $self->{oidx}->eidxq_add($num); } - return if $sync->{quit}; } + return if $sync->{quit}; } - return if $sync->{quit}; next unless scalar keys %x3m; + $self->git->async_wait_all; # wait for reindex_unseen # eliminate stale/mismatched entries my %mismatch = map { $_->{num} => $_->{blob} } @$msgs; @@ -884,14 +929,26 @@ ibx_id = ? AND xnum >= ? AND xnum <= ? my ($xnum, $hex) = unpack('JH*', $k); my $bin = pack('H*', $hex); my $exp = $mismatch{$xnum}; + if (defined $exp) { + my $smsg = $ibx->over->get_art($xnum) // next; + # $xnum may be expired by another process + if ($smsg->{blob} eq $hex) { + warn <<""; +BUG: (non-fatal) $ekey #$xnum $smsg->{blob} still matches (old exp: $exp) + + next; + } # else: continue to unref + } my $m = defined($exp) ? "mismatch (!= $exp)" : 'stale'; warn("# $xnum:$hex (#@$docids): $m\n"); for my $i (@$docids) { _unref_doc($sync, $i, $ibx, $xnum, $bin); } + return if $sync->{quit}; } } - _unref_stale_range($sync, $ibx, "> $hi") if defined($hi); + defined($hi) and ($hi < $max) and + _unref_stale_range($sync, $ibx, "xnum > $hi AND xnum <= $max"); } sub _reindex_inbox ($$$) { @@ -956,9 +1013,8 @@ sub dd_smsg { # git->cat_async callback my $oidx = $self->{oidx}; for my $smsg (@$ary) { my $gone = $smsg->{num}; - $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob}); - $self->idx_shard($gone)->ipc_do('xdb_remove', $gone); - $oidx->delete_by_num($gone); + $oidx->merge_xref3($keep->{num}, $gone, $smsg->oidbin); + remove_doc($self, $gone); } } } @@ -1163,7 +1219,7 @@ sub idx_init { # similar to V2Writable $self->git->cleanup; my $mode = 0644; my $ALL = $self->git->{git_dir}; # topdir/ALL.git - my ($has_new, $alt, $seen); + my ($has_new, $alt, $seen, $prune, $prune_nr); if ($opt->{-private}) { # LeiStore my $local = "$self->{topdir}/local"; # lei/store $self->{mg} //= PublicInbox::MultiGit->new($self->{topdir}, @@ -1179,8 +1235,8 @@ sub idx_init { # similar to V2Writable } else { # extindex has no epochs $self->{mg} //= PublicInbox::MultiGit->new($self->{topdir}, 'ALL.git'); - ($alt, $seen) = $self->{mg}->read_alternates(\$mode, - $opt->{-idx_gc}); + $prune = $opt->{-idx_gc} ? \$prune_nr : undef; + ($alt, $seen) = $self->{mg}->read_alternates(\$mode, $prune); PublicInbox::Import::init_bare($ALL); } @@ -1214,7 +1270,7 @@ sub idx_init { # similar to V2Writable } $new .= "$d\n"; } - ($has_new || $new ne '') and + ($has_new || $prune_nr || $new ne '') and $self->{mg}->write_alternates($mode, $alt, $new); $git_midx and $self->with_umask(sub { my @cmd = ('multi-pack-index');