]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/ExtSearchIdx.pm
extindex: speed up --reindex --fast
[public-inbox.git] / lib / PublicInbox / ExtSearchIdx.pm
index c34225b29d9a5249f89f5f7fc036b4d2666caf4a..8da98ba44a9a2d1ad48929ac116559bf6b591a76 100644 (file)
@@ -89,6 +89,7 @@ sub attach_config {
                        $self->{ibx_map}->{$ibx->eidx_key} //= do {
                                push @{$self->{ibx_active}}, $ibx;
                                push @{$self->{ibx_known}}, $ibx;
+                               $ibx;
                        }
                }
                # invalidate cache
@@ -119,9 +120,8 @@ sub apply_boost ($$) {
                                ||
                $a->[1] <=> $b->[1] # break ties with {xnum}
        } @$xr3;
-       my $top_blob = unpack('H*', $xr3->[0]->[2]);
        my $new_smsg = $req->{new_smsg};
-       return if $top_blob ne $new_smsg->{blob}; # loser
+       return if $xr3->[0]->[2] ne pack('H*', $new_smsg->{blob}); # loser
 
        # replace the old smsg with the more boosted one
        $new_smsg->{num} = $smsg->{num};
@@ -222,13 +222,16 @@ sub _blob_missing ($$) { # called when $smsg->{blob} is bad
        my $xref3 = $self->{oidx}->get_xref3($smsg->{num});
        my @keep = grep(!/:$smsg->{blob}\z/, @$xref3);
        if (@keep) {
+               warn "E: $smsg->{blob} gone, removing #$smsg->{num}\n";
                $keep[0] =~ /:([a-f0-9]{40,}+)\z/ or
                        die "BUG: xref $keep[0] has no OID";
                my $oidhex = $1;
                $self->{oidx}->remove_xref3($smsg->{num}, $smsg->{blob});
-               my $upd = $self->{oidx}->update_blob($smsg, $oidhex);
-               my $saved = $self->{oidx}->get_art($smsg->{num});
+               $self->{oidx}->update_blob($smsg, $oidhex) or warn <<EOM;
+E: #$smsg->{num} gone ($smsg->{blob} => $oidhex)
+EOM
        } else {
+               warn "E: $smsg->{blob} gone, removing #$smsg->{num}\n";
                $self->{oidx}->delete_by_num($smsg->{num});
        }
 }
@@ -366,19 +369,16 @@ SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ?
        }
 }
 
-sub eidx_gc {
-       my ($self, $opt) = @_;
-       $self->{cfg} or die "E: GC requires ->attach_config\n";
-       $opt->{-idx_gc} = 1;
-       $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
-
-       my $dbh = $self->{oidx}->dbh;
-       $dbh->do('PRAGMA case_sensitive_like = ON'); # only place we use LIKE
-       my $x3_doc = $dbh->prepare('SELECT docid FROM xref3 WHERE ibx_id = ?');
-       my $ibx_ck = $dbh->prepare('SELECT ibx_id,eidx_key FROM inboxes');
-       my $lc_i = $dbh->prepare(<<'');
-SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ?
-
+sub eidx_gc_scan_inboxes ($$) {
+       my ($self, $sync) = @_;
+       my ($x3_doc, $ibx_ck);
+restart:
+       $x3_doc = $self->{oidx}->dbh->prepare(<<EOM);
+SELECT docid FROM xref3 WHERE ibx_id = ?
+EOM
+       $ibx_ck = $self->{oidx}->dbh->prepare(<<EOM);
+SELECT ibx_id,eidx_key FROM inboxes
+EOM
        $ibx_ck->execute;
        while (my ($ibx_id, $eidx_key) = $ibx_ck->fetchrow_array) {
                next if $self->{ibx_map}->{$eidx_key};
@@ -387,44 +387,93 @@ SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ?
                $x3_doc->execute($ibx_id);
                while (defined(my $docid = $x3_doc->fetchrow_array)) {
                        gc_unref_doc($self, $ibx_id, $eidx_key, $docid);
+                       if (checkpoint_due($sync)) {
+                               $x3_doc = $ibx_ck = undef;
+                               reindex_checkpoint($self, $sync);
+                               goto restart;
+                       }
                }
-               $dbh->prepare_cached(<<'')->execute($ibx_id);
+               $self->{oidx}->dbh->do(<<'', undef, $ibx_id);
 DELETE FROM inboxes WHERE ibx_id = ?
 
                # drop last_commit info
                my $pat = $eidx_key;
                $pat =~ s/([_%\\])/\\$1/g;
+               $self->{oidx}->dbh->do('PRAGMA case_sensitive_like = ON');
+               my $lc_i = $self->{oidx}->dbh->prepare(<<'');
+SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ?
+
                $lc_i->execute("lc-%:$pat//%", '\\');
                while (my ($key) = $lc_i->fetchrow_array) {
                        next if $key !~ m!\Alc-v[1-9]+:\Q$eidx_key\E//!;
                        warn "I: removing $key\n";
-                       $dbh->prepare_cached(<<'')->execute($key);
+                       $self->{oidx}->dbh->do(<<'', undef, $key);
 DELETE FROM eidx_meta WHERE key = ?
 
                }
-
                warn "I: $eidx_key removed\n";
        }
+}
 
-       # it's not real unless it's in `over', we use parallelism here,
-       # shards will be reading directly from over, so commit
-       $self->{oidx}->commit_lazy;
-       $self->{oidx}->begin_lazy;
-
-       for my $idx (@{$self->{idx_shards}}) {
-               warn "I: cleaning up shard #$idx->{shard}\n";
-               $idx->shard_over_check($self->{oidx});
-       }
-       my $nr = $dbh->do(<<'');
+sub eidx_gc_scan_shards ($$) { # TODO: use for lei/store
+       my ($self, $sync) = @_;
+       my $nr = $self->{oidx}->dbh->do(<<'');
 DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
 
        warn "I: eliminated $nr stale xref3 entries\n" if $nr != 0;
+       reindex_checkpoint($self, $sync) if checkpoint_due($sync);
 
        # fixup from old bugs:
-       $nr = $dbh->do(<<'');
-DELETE FROM over WHERE num NOT IN (SELECT docid FROM xref3)
+       $nr = $self->{oidx}->dbh->do(<<'');
+DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3)
 
        warn "I: eliminated $nr stale over entries\n" if $nr != 0;
+       reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+
+       my ($cur) = $self->{oidx}->dbh->selectrow_array(<<EOM);
+SELECT MIN(num) FROM over WHERE num > 0
+EOM
+       $cur // return; # empty
+       my ($r, $n, %active);
+       $nr = 0;
+       while (1) {
+               $r = $self->{oidx}->dbh->selectcol_arrayref(<<"", undef, $cur);
+SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000
+
+               last unless scalar(@$r);
+               while (defined($n = shift @$r)) {
+                       for my $i ($cur..($n - 1)) {
+                               my $idx = idx_shard($self, $i);
+                               $idx->ipc_do('xdb_remove_quiet', $i);
+                               $active{$idx} = $idx;
+                       }
+                       $cur = $n + 1;
+               }
+               if (checkpoint_due($sync)) {
+                       for my $idx (values %active) {
+                               $nr += $idx->ipc_do('nr_quiet_rm')
+                       }
+                       %active = ();
+                       reindex_checkpoint($self, $sync);
+               }
+       }
+       warn "I: eliminated $nr stale Xapian documents\n" if $nr != 0;
+}
+
+sub eidx_gc {
+       my ($self, $opt) = @_;
+       $self->{cfg} or die "E: GC requires ->attach_config\n";
+       $opt->{-idx_gc} = 1;
+       my $sync = {
+               need_checkpoint => \(my $need_checkpoint = 0),
+               check_intvl => 10,
+               next_check => now() + 10,
+               checkpoint_unlocks => 1,
+               -opt => $opt,
+       };
+       $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
+       eidx_gc_scan_inboxes($self, $sync);
+       eidx_gc_scan_shards($self, $sync);
        done($self);
 }
 
@@ -688,6 +737,7 @@ sub prep_id2pos ($) {
 
 sub eidxq_process ($$) { # for reindexing
        my ($self, $sync) = @_;
+       return unless $self->{cfg};
 
        return unless eidxq_lock_acquire($self);
        my $dbh = $self->{oidx}->dbh;
@@ -757,114 +807,117 @@ sub reindex_unseen ($$$$) {
        $self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req);
 }
 
-sub _reindex_check_unseen ($$$) {
+sub _unref_stale ($$$$$) {
+       my ($sync, $docid, $ibx, $xnum, $oidbin) = @_;
+       my $del = $sync->{self}->{oidx}->dbh->prepare_cached(<<'');
+DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
+
+       $del->bind_param(1, $ibx->{-ibx_id});
+       $del->bind_param(2, $xnum);
+       $del->bind_param(3, $oidbin, SQL_BLOB);
+       $del->execute;
+       my $xr3 = $sync->{self}->{oidx}->get_xref3($docid, 1);
+       my $idx = $sync->{self}->idx_shard($docid);
+       if (scalar(@$xr3) == 0) { # all gone
+               $sync->{self}->{oidx}->delete_by_num($docid);
+               $sync->{self}->{oidx}->eidxq_del($docid);
+               $idx->ipc_do('xdb_remove', $docid);
+       } else { # enqueue for reindex of remaining messages
+               $idx->ipc_do('remove_eidx_info', $docid, $ibx->eidx_key);
+               $sync->{self}->{oidx}->eidxq_add($docid); # yes, add
+       }
+}
+
+sub _unref_stale_range ($$$) {
+       my ($sync, $ibx, $lt_or_gt) = @_;
+       my $r;
+       my $lim = 10000;
+       do {
+               $r = $sync->{self}->{oidx}->dbh->selectall_arrayref(
+                       <<EOS, undef, $ibx->{-ibx_id});
+SELECT docid,xnum,oidbin FROM xref3
+WHERE ibx_id = ? AND xnum $lt_or_gt LIMIT $lim
+EOS
+               return if $sync->{quit};
+               for (@$r) { # hopefully rare, not worth optimizing:
+                       my ($docid, $xnum, $oidbin) = @$_;
+                       my $hex = unpack('H*', $oidbin);
+                       warn("# $xnum:$hex (#$docid): stale\n");
+                       _unref_stale($sync, $docid, $ibx, $xnum, $oidbin);
+               }
+       } while (scalar(@$r) == $lim);
+       1;
+}
+
+sub _reindex_check_ibx ($$$) {
        my ($self, $sync, $ibx) = @_;
        my $ibx_id = $ibx->{-ibx_id};
-       my $slice = 1000;
+       my $slice = 10000;
+       my $opt = { limit => $slice };
        my ($beg, $end) = (1, $slice);
+       my $err = sync_inbox($self, $sync, $ibx) and return;
+       my $max = $ibx->over->max;
+       $end = $max if $end > $max;
 
        # first, check if we missed any messages in target $ibx
        my $msgs;
        my $pr = $sync->{-opt}->{-progress};
        my $ekey = $ibx->eidx_key;
-       local $sync->{-regen_fmt} =
-                       "$ekey checking unseen %u/".$ibx->over->max."\n";
+       local $sync->{-regen_fmt} = "$ekey checking %u/$max\n";
        ${$sync->{nr}} = 0;
-
-       while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+       my $fast = $sync->{-opt}->{fast};
+       my $dsu; # _unref_stale_range (< $lo) called
+       my ($lo, $hi);
+       while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end, $opt)})) {
                ${$sync->{nr}} = $beg;
                $beg = $msgs->[-1]->{num} + 1;
                $end = $beg + $slice;
+               $end = $max if $end > $max;
                if (checkpoint_due($sync)) {
                        reindex_checkpoint($self, $sync); # release lock
                }
-
-               my $inx3 = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
-SELECT DISTINCT(docid) FROM xref3 WHERE
-ibx_id = ? AND xnum = ? AND oidbin = ?
-
+               ($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num});
+               $dsu //= _unref_stale_range($sync, $ibx, "< $lo");
+               my $x3a = $self->{oidx}->dbh->selectall_arrayref(
+                       <<"", undef, $ibx_id, $lo, $hi);
+SELECT xnum,oidbin,docid FROM xref3 WHERE
+ibx_id = ? AND xnum >= ? AND xnum <= ?
+
+               my %x3m;
+               for (@$x3a) {
+                       my $k = pack('J', $_->[0]) . $_->[1];
+                       push @{$x3m{$k}}, $_->[2];
+               }
+               undef $x3a;
                for my $xsmsg (@$msgs) {
-                       my $oidbin = pack('H*', $xsmsg->{blob});
-                       $inx3->bind_param(1, $ibx_id);
-                       $inx3->bind_param(2, $xsmsg->{num});
-                       $inx3->bind_param(3, $oidbin, SQL_BLOB);
-                       $inx3->execute;
-                       my $docids = $inx3->fetchall_arrayref;
-                       # index messages which were totally missed
-                       # the first time around ASAP:
-                       if (scalar(@$docids) == 0) {
+                       my $k = pack('JH*', $xsmsg->{num}, $xsmsg->{blob});
+                       my $docids = delete($x3m{$k});
+                       if (!defined($docids)) {
                                reindex_unseen($self, $sync, $ibx, $xsmsg);
-                       } else { # already seen, reindex later
-                               for my $r (@$docids) {
-                                       $self->{oidx}->eidxq_add($r->[0]);
+                       } elsif (!$fast) {
+                               for my $num (@$docids) {
+                                       $self->{oidx}->eidxq_add($num);
                                }
+                               return if $sync->{quit};
                        }
-                       last if $sync->{quit};
                }
-               last if $sync->{quit};
-       }
-}
-
-sub _reindex_check_stale ($$$) {
-       my ($self, $sync, $ibx) = @_;
-       my $min = 0;
-       my $pr = $sync->{-opt}->{-progress};
-       my $fetching;
-       my $ekey = $ibx->eidx_key;
-       local $sync->{-regen_fmt} =
-                       "$ekey check stale/missing %u/".$ibx->over->max."\n";
-       ${$sync->{nr}} = 0;
-       do {
-               if (checkpoint_due($sync)) {
-                       reindex_checkpoint($self, $sync); # release lock
-               }
-               # now, check if there's stale xrefs
-               my $iter = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
-SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND docid > ?
-ORDER BY docid,xnum ASC LIMIT 10000
-
-               $iter->execute($ibx->{-ibx_id}, $min);
-               $fetching = undef;
-
-               while (my ($docid, $xnum, $oidbin) = $iter->fetchrow_array) {
-                       return if $sync->{quit};
-                       ${$sync->{nr}} = $xnum;
-
-                       $fetching = $min = $docid;
-                       my $smsg = $ibx->over->get_art($xnum);
-                       my $err;
-                       if (!$smsg) {
-                               $err = 'stale';
-                       } elsif (pack('H*', $smsg->{blob}) ne $oidbin) {
-                               $err = "mismatch (!= $smsg->{blob})";
-                       } else {
-                               next; # likely, all good
-                       }
-                       # current_info already has eidx_key
-                       my $oidhex = unpack('H*', $oidbin);
-                       warn "$xnum:$oidhex (#$docid): $err\n";
-                       my $del = $self->{oidx}->dbh->prepare_cached(<<'');
-DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
-
-                       $del->bind_param(1, $ibx->{-ibx_id});
-                       $del->bind_param(2, $xnum);
-                       $del->bind_param(3, $oidbin, SQL_BLOB);
-                       $del->execute;
-
-                       # get_xref3 over-fetches, but this is a rare path:
-                       my $xr3 = $self->{oidx}->get_xref3($docid);
-                       my $idx = $self->idx_shard($docid);
-                       if (scalar(@$xr3) == 0) { # all gone
-                               $self->{oidx}->delete_by_num($docid);
-                               $self->{oidx}->eidxq_del($docid);
-                               $idx->ipc_do('xdb_remove', $docid);
-                       } else { # enqueue for reindex of remaining messages
-                               $idx->ipc_do('remove_eidx_info',
-                                               $docid, $ibx->eidx_key);
-                               $self->{oidx}->eidxq_add($docid); # yes, add
+               return if $sync->{quit};
+               next unless scalar keys %x3m;
+
+               # eliminate stale/mismatched entries
+               my %mismatch = map { $_->{num} => $_->{blob} } @$msgs;
+               while (my ($k, $docids) = each %x3m) {
+                       my ($xnum, $hex) = unpack('JH*', $k);
+                       my $bin = pack('H*', $hex);
+                       my $exp = $mismatch{$xnum};
+                       my $m = defined($exp) ? "mismatch (!= $exp)" : 'stale';
+                       warn("# $xnum:$hex (#@$docids): $m\n");
+                       for my $i (@$docids) {
+                               _unref_stale($sync, $i, $ibx, $xnum, $bin);
                        }
                }
-       } while (defined $fetching);
+       }
+       _unref_stale_range($sync, $ibx, "> $hi") if defined($hi);
 }
 
 sub _reindex_inbox ($$$) {
@@ -874,14 +927,14 @@ sub _reindex_inbox ($$$) {
        if (defined(my $err = _ibx_index_reject($ibx))) {
                warn "W: cannot reindex $ekey ($err)\n";
        } else {
-               _reindex_check_unseen($self, $sync, $ibx);
-               _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+               _reindex_check_ibx($self, $sync, $ibx);
        }
        delete @$ibx{qw(over mm search git)}; # won't need these for a bit
 }
 
 sub eidx_reindex {
        my ($self, $sync) = @_;
+       return unless $self->{cfg};
 
        # acquire eidxq_lock early because full reindex takes forever
        # and incremental -extindex processes can run during our checkpoints
@@ -902,6 +955,7 @@ sub sync_inbox {
        my $err = _sync_inbox($self, $sync, $ibx);
        delete @$ibx{qw(mm over)};
        warn $err, "\n" if defined($err);
+       $err;
 }
 
 sub dd_smsg { # git->cat_async callback