X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FExtSearchIdx.pm;h=c2ab0447e17624544a319d7fb8b4ddb28f0f4ace;hb=ed0167d2a851b4f5128f57ad60309a0b76e62cfa;hp=20c4cf7807eae9cb152b9760c9ede22bfdc95595;hpb=8ffed15abbec68463ff3f3105d6651c8a4b5d813;p=public-inbox.git diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 20c4cf78..c2ab0447 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -129,32 +129,72 @@ sub apply_boost ($$) { $req->{self}->{oidx}->add_overview($req->{eml}, $new_smsg); } +sub remove_doc ($$) { + my ($self, $docid) = @_; + $self->{oidx}->delete_by_num($docid); + $self->{oidx}->eidxq_del($docid); + $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); +} + +sub _unref_doc ($$$$$;$) { + my ($sync, $docid, $ibx, $xnum, $oidbin, $eml) = @_; + my $smsg; + if (ref($docid)) { + $smsg = $docid; + $docid = $smsg->{num}; + } + my $s = 'DELETE FROM xref3 WHERE oidbin = ?'; + $s .= ' AND ibx_id = ?' if defined($ibx); + $s .= ' AND xnum = ?' if defined($xnum); + my $del = $sync->{self}->{oidx}->dbh->prepare_cached($s); + my $col = 0; + $del->bind_param(++$col, $oidbin, SQL_BLOB); + $del->bind_param(++$col, $ibx->{-ibx_id}) if $ibx; + $del->bind_param(++$col, $xnum) if defined($xnum); + $del->execute; + my $xr3 = $sync->{self}->{oidx}->get_xref3($docid); + if (scalar(@$xr3) == 0) { # all gone + remove_doc($sync->{self}, $docid); + } else { # enqueue for reindex of remaining messages + if ($ibx) { + my $ekey = $ibx->{-gc_eidx_key} // $ibx->eidx_key; + my $idx = $sync->{self}->idx_shard($docid); + $idx->ipc_do('remove_eidx_info', $docid, $ekey, $eml); + } # else: we can't remove_eidx_info in reindex-only path + + # replace invalidated blob ASAP with something which should be + # readable since we may commit the transaction on checkpoint. + # eidxq processing will re-apply boost + $smsg //= $sync->{self}->{oidx}->get_art($docid); + my $hex = unpack('H*', $oidbin); + if ($smsg && $smsg->{blob} eq $hex) { + $xr3->[0] =~ /:([a-f0-9]{40,}+)\z/ or + die "BUG: xref $xr3->[0] has no OID"; + $sync->{self}->{oidx}->update_blob($smsg, $1); + } + # yes, add, we'll need to re-apply boost + $sync->{self}->{oidx}->eidxq_add($docid); + } + @$xr3 +} + sub do_xpost ($$) { my ($req, $smsg) = @_; my $self = $req->{self}; my $docid = $smsg->{num}; - my $idx = $self->idx_shard($docid); my $oid = $req->{oid}; my $xibx = $req->{ibx}; my $eml = $req->{eml}; - my $eidx_key = $xibx->eidx_key; if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message + my $eidx_key = $xibx->eidx_key; my $xnum = $req->{xnum}; $self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key); + my $idx = $self->idx_shard($docid); $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml); apply_boost($req, $smsg) if $req->{boost_in_use}; - } else { # 'd' - my $rm_eidx_info; - my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key, - \$rm_eidx_info); - if ($nr == 0) { - $self->{oidx}->eidxq_del($docid); - $idx->ipc_do('xdb_remove', $docid); - } elsif ($rm_eidx_info) { - $idx->ipc_do('remove_eidx_info', - $docid, $eidx_key, $eml); - $self->{oidx}->eidxq_add($docid); # yes, add - } + } else { # 'd' no {xnum} + $oid = pack('H*', $oid); + _unref_doc($req, $docid, $xibx, undef, $oid, $eml); } } @@ -216,24 +256,12 @@ sub do_step ($) { # main iterator for adding messages to the index do_finalize($req); } -sub _blob_missing ($$) { # called when $smsg->{blob} is bad +sub _blob_missing ($$) { # called when a known $smsg->{blob} is gone my ($req, $smsg) = @_; - my $self = $req->{self}; - my $xref3 = $self->{oidx}->get_xref3($smsg->{num}); - my @keep = grep(!/:$smsg->{blob}\z/, @$xref3); - if (@keep) { - warn "E: $smsg->{blob} gone, removing #$smsg->{num}\n"; - $keep[0] =~ /:([a-f0-9]{40,}+)\z/ or - die "BUG: xref $keep[0] has no OID"; - my $oidhex = $1; - $self->{oidx}->remove_xref3($smsg->{num}, $smsg->{blob}); - $self->{oidx}->update_blob($smsg, $oidhex) or warn <{num} gone ($smsg->{blob} => $oidhex) -EOM - } else { - warn "E: $smsg->{blob} gone, removing #$smsg->{num}\n"; - $self->{oidx}->delete_by_num($smsg->{num}); - } + # xnum and ibx are unknown, we only call this when an entry from + # /ei*/over.sqlite3 is bad, not on entries from xap*/over.sqlite3 + my $oidbin = pack('H*', $smsg->{blob}); + _unref_doc($req, $smsg, undef, undef, $oidbin); } sub ck_existing { # git->cat_async callback @@ -345,36 +373,12 @@ sub _sync_inbox ($$$) { undef; } -sub gc_unref_doc ($$$$) { - my ($self, $ibx_id, $eidx_key, $docid) = @_; - my $remain = 0; - # for debug/info purposes, oids may no longer be accessible - my $dbh = $self->{oidx}->dbh; - my $sth = $dbh->prepare_cached(<<'', undef, 1); -SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ? - - $sth->execute($docid, $ibx_id); - my @oid = map { unpack('H*', $_->[0]) } @{$sth->fetchall_arrayref}; - for my $oid (@oid) { - $remain += $self->{oidx}->remove_xref3($docid, $oid, $eidx_key); - } - if ($remain) { - $self->{oidx}->eidxq_add($docid); # enqueue for reindex - for my $oid (@oid) { - warn "I: unref #$docid $eidx_key $oid\n"; - } - } else { - warn "I: remove #$docid $eidx_key @oid\n"; - $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); - } -} - sub eidx_gc_scan_inboxes ($$) { my ($self, $sync) = @_; my ($x3_doc, $ibx_ck); restart: $x3_doc = $self->{oidx}->dbh->prepare(<{oidx}->dbh->prepare(<{midx}->remove_eidx_key($eidx_key); warn "I: deleting messages for $eidx_key...\n"; $x3_doc->execute($ibx_id); - while (defined(my $docid = $x3_doc->fetchrow_array)) { - gc_unref_doc($self, $ibx_id, $eidx_key, $docid); + my $ibx = { -ibx_id => $ibx_id, -gc_eidx_key => $eidx_key }; + while (my ($docid, $xnum, $oid) = $x3_doc->fetchrow_array) { + my $r = _unref_doc($sync, $docid, $ibx, $xnum, $oid); + $oid = unpack('H*', $oid); + $r = $r ? 'unref' : 'remove'; + warn "I: $r #$docid $eidx_key $oid\n"; if (checkpoint_due($sync)) { $x3_doc = $ibx_ck = undef; reindex_checkpoint($self, $sync); @@ -421,34 +429,49 @@ sub eidx_gc_scan_shards ($$) { # TODO: use for lei/store DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) warn "I: eliminated $nr stale xref3 entries\n" if $nr != 0; + reindex_checkpoint($self, $sync) if checkpoint_due($sync); # fixup from old bugs: $nr = $self->{oidx}->dbh->do(<<''); -DELETE FROM over WHERE num NOT IN (SELECT docid FROM xref3) +DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3) warn "I: eliminated $nr stale over entries\n" if $nr != 0; + reindex_checkpoint($self, $sync) if checkpoint_due($sync); + + $nr = $self->{oidx}->dbh->do(<<''); +DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over) + + warn "I: eliminated $nr stale reindex queue entries\n" if $nr != 0; + reindex_checkpoint($self, $sync) if checkpoint_due($sync); my ($cur) = $self->{oidx}->dbh->selectrow_array(<{oidx}->dbh->selectrow_array(< 0 EOM - my $exists; -restart: - $exists = $self->{oidx}->dbh->prepare(<execute($cur); - next if $exists->fetchrow_array != 0; - $self->idx_shard($cur)->ipc_do('xdb_remove_quiet', $cur); + $cur // return; # empty + my ($r, $n, %active_shards); + $nr = 0; + while (1) { + $r = $self->{oidx}->dbh->selectcol_arrayref(<<"", undef, $cur); +SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000 + + last unless scalar(@$r); + while (defined($n = shift @$r)) { + for my $i ($cur..($n - 1)) { + my $idx = idx_shard($self, $i); + $idx->ipc_do('xdb_remove_quiet', $i); + $active_shards{$idx} = $idx; + } + $cur = $n + 1; + } if (checkpoint_due($sync)) { - $exists = undef; + for my $idx (values %active_shards) { + $nr += $idx->ipc_do('nr_quiet_rm') + } + %active_shards = (); reindex_checkpoint($self, $sync); - goto restart; } } + warn "I: eliminated $nr stale Xapian documents\n" if $nr != 0; } sub eidx_gc { @@ -461,6 +484,7 @@ sub eidx_gc { next_check => now() + 10, checkpoint_unlocks => 1, -opt => $opt, + self => $self, }; $self->idx_init($opt); # acquire lock via V2Writable::_idx_init eidx_gc_scan_inboxes($self, $sync); @@ -533,7 +557,8 @@ sub _reindex_finalize ($$$) { for my $ary (values %$by_chash) { for my $x (reverse @$ary) { warn "removing #$docid xref3 $x->{blob}\n"; - my $n = $self->{oidx}->remove_xref3($docid, $x->{blob}); + my $bin = pack('H*', $x->{blob}); + my $n = _unref_doc($sync, $docid, undef, undef, $bin); die "BUG: $x->{blob} invalidated #$docid" if $n == 0; } my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty"; @@ -564,15 +589,14 @@ sub _reindex_oid { # git->cat_async callback my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2]; my $docid = $orig_smsg->{num}; if (is_bad_blob($oid, $type, $size, $expect_oid)) { - my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid); + my $oidbin = pack('H*', $expect_oid); + my $remain = _unref_doc($sync, $docid, undef, undef, $oidbin); if ($remain == 0) { warn "W: #$docid gone or corrupted\n"; - $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) { $self->git->cat_async($next_oid, \&_reindex_oid, $req); } else { warn "BUG: #$docid gone (UNEXPECTED)\n"; - $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); } return; } @@ -605,8 +629,7 @@ sub _reindex_smsg ($$$) { warn <<""; BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex - $self->{oidx}->delete_by_num($docid); - $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); + remove_doc($self, $docid); return; } @@ -798,114 +821,96 @@ sub reindex_unseen ($$$$) { $self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req); } -sub _reindex_check_unseen ($$$) { +sub _unref_stale_range ($$$) { + my ($sync, $ibx, $lt_or_gt) = @_; + my $r; + my $lim = 10000; + do { + $r = $sync->{self}->{oidx}->dbh->selectall_arrayref( + <{-ibx_id}); +SELECT docid,xnum,oidbin FROM xref3 +WHERE ibx_id = ? AND xnum $lt_or_gt LIMIT $lim +EOS + return if $sync->{quit}; + for (@$r) { # hopefully rare, not worth optimizing: + my ($docid, $xnum, $oidbin) = @$_; + my $hex = unpack('H*', $oidbin); + warn("# $xnum:$hex (#$docid): stale\n"); + _unref_doc($sync, $docid, $ibx, $xnum, $oidbin); + } + } while (scalar(@$r) == $lim); + 1; +} + +sub _reindex_check_ibx ($$$) { my ($self, $sync, $ibx) = @_; my $ibx_id = $ibx->{-ibx_id}; - my $slice = 1000; + my $slice = 10000; + my $opt = { limit => $slice }; my ($beg, $end) = (1, $slice); + my $err = sync_inbox($self, $sync, $ibx) and return; + my $max = $ibx->over->max; + $end = $max if $end > $max; # first, check if we missed any messages in target $ibx my $msgs; my $pr = $sync->{-opt}->{-progress}; my $ekey = $ibx->eidx_key; - local $sync->{-regen_fmt} = - "$ekey checking unseen %u/".$ibx->over->max."\n"; + local $sync->{-regen_fmt} = "$ekey checking %u/$max\n"; ${$sync->{nr}} = 0; my $fast = $sync->{-opt}->{fast}; - while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) { + my $dsu; # _unref_stale_range (< $lo) called + my ($lo, $hi); + while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end, $opt)})) { ${$sync->{nr}} = $beg; $beg = $msgs->[-1]->{num} + 1; $end = $beg + $slice; + $end = $max if $end > $max; if (checkpoint_due($sync)) { reindex_checkpoint($self, $sync); # release lock } - - my $inx3 = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1); -SELECT DISTINCT(docid) FROM xref3 WHERE -ibx_id = ? AND xnum = ? AND oidbin = ? - + ($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num}); + $dsu //= _unref_stale_range($sync, $ibx, "< $lo"); + my $x3a = $self->{oidx}->dbh->selectall_arrayref( + <<"", undef, $ibx_id, $lo, $hi); +SELECT xnum,oidbin,docid FROM xref3 WHERE +ibx_id = ? AND xnum >= ? AND xnum <= ? + + my %x3m; + for (@$x3a) { + my $k = pack('J', $_->[0]) . $_->[1]; + push @{$x3m{$k}}, $_->[2]; + } + undef $x3a; for my $xsmsg (@$msgs) { - my $oidbin = pack('H*', $xsmsg->{blob}); - $inx3->bind_param(1, $ibx_id); - $inx3->bind_param(2, $xsmsg->{num}); - $inx3->bind_param(3, $oidbin, SQL_BLOB); - $inx3->execute; - my $docids = $inx3->fetchall_arrayref; - # index messages which were totally missed - # the first time around ASAP: - if (scalar(@$docids) == 0) { + my $k = pack('JH*', $xsmsg->{num}, $xsmsg->{blob}); + my $docids = delete($x3m{$k}); + if (!defined($docids)) { reindex_unseen($self, $sync, $ibx, $xsmsg); - } elsif (!$fast) { # already seen, reindex later - for my $r (@$docids) { - $self->{oidx}->eidxq_add($r->[0]); + } elsif (!$fast) { + for my $num (@$docids) { + $self->{oidx}->eidxq_add($num); } + return if $sync->{quit}; } - last if $sync->{quit}; } - last if $sync->{quit}; - } -} - -sub _reindex_check_stale ($$$) { - my ($self, $sync, $ibx) = @_; - my $min = 0; - my $pr = $sync->{-opt}->{-progress}; - my $fetching; - my $ekey = $ibx->eidx_key; - local $sync->{-regen_fmt} = - "$ekey checking stale/missing %u/".$ibx->over->max."\n"; - ${$sync->{nr}} = 0; - do { - if (checkpoint_due($sync)) { - reindex_checkpoint($self, $sync); # release lock - } - # now, check if there's stale xrefs - my $iter = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1); -SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND docid > ? -ORDER BY docid,xnum ASC LIMIT 10000 - - $iter->execute($ibx->{-ibx_id}, $min); - $fetching = undef; - - while (my ($docid, $xnum, $oidbin) = $iter->fetchrow_array) { - return if $sync->{quit}; - ${$sync->{nr}} = $xnum; - - $fetching = $min = $docid; - my $smsg = $ibx->over->get_art($xnum); - my $err; - if (!$smsg) { - $err = 'stale'; - } elsif (pack('H*', $smsg->{blob}) ne $oidbin) { - $err = "mismatch (!= $smsg->{blob})"; - } else { - next; # likely, all good - } - # current_info already has eidx_key - my $oidhex = unpack('H*', $oidbin); - warn "$xnum:$oidhex (#$docid): $err\n"; - my $del = $self->{oidx}->dbh->prepare_cached(<<''); -DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? - - $del->bind_param(1, $ibx->{-ibx_id}); - $del->bind_param(2, $xnum); - $del->bind_param(3, $oidbin, SQL_BLOB); - $del->execute; - - # get_xref3 over-fetches, but this is a rare path: - my $xr3 = $self->{oidx}->get_xref3($docid); - my $idx = $self->idx_shard($docid); - if (scalar(@$xr3) == 0) { # all gone - $self->{oidx}->delete_by_num($docid); - $self->{oidx}->eidxq_del($docid); - $idx->ipc_do('xdb_remove', $docid); - } else { # enqueue for reindex of remaining messages - $idx->ipc_do('remove_eidx_info', - $docid, $ibx->eidx_key); - $self->{oidx}->eidxq_add($docid); # yes, add + return if $sync->{quit}; + next unless scalar keys %x3m; + + # eliminate stale/mismatched entries + my %mismatch = map { $_->{num} => $_->{blob} } @$msgs; + while (my ($k, $docids) = each %x3m) { + my ($xnum, $hex) = unpack('JH*', $k); + my $bin = pack('H*', $hex); + my $exp = $mismatch{$xnum}; + my $m = defined($exp) ? "mismatch (!= $exp)" : 'stale'; + warn("# $xnum:$hex (#@$docids): $m\n"); + for my $i (@$docids) { + _unref_doc($sync, $i, $ibx, $xnum, $bin); } } - } while (defined $fetching); + } + _unref_stale_range($sync, $ibx, "> $hi") if defined($hi); } sub _reindex_inbox ($$$) { @@ -915,8 +920,7 @@ sub _reindex_inbox ($$$) { if (defined(my $err = _ibx_index_reject($ibx))) { warn "W: cannot reindex $ekey ($err)\n"; } else { - _reindex_check_unseen($self, $sync, $ibx); - _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit}; + _reindex_check_ibx($self, $sync, $ibx); } delete @$ibx{qw(over mm search git)}; # won't need these for a bit } @@ -944,6 +948,7 @@ sub sync_inbox { my $err = _sync_inbox($self, $sync, $ibx); delete @$ibx{qw(mm over)}; warn $err, "\n" if defined($err); + $err; } sub dd_smsg { # git->cat_async callback @@ -971,8 +976,7 @@ sub dd_smsg { # git->cat_async callback for my $smsg (@$ary) { my $gone = $smsg->{num}; $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob}); - $self->idx_shard($gone)->ipc_do('xdb_remove', $gone); - $oidx->delete_by_num($gone); + remove_doc($self, $gone); } } }