]> Sergey Matveev's repositories - public-inbox.git/commitdiff
extindex: implement --dedupe to fix old extindices
authorEric Wong <e@80x24.org>
Tue, 6 Jul 2021 12:42:02 +0000 (12:42 +0000)
committerEric Wong <e@80x24.org>
Tue, 6 Jul 2021 13:36:54 +0000 (13:36 +0000)
This is intended to fix older indices that had deduplication
bugs for matching content.  It'll also make dealing with
future changes to ContentHash easier since that's never
guaranteed stable.

It also supports --dry-run to print changes only without
making them.

lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/OverIdx.pm
script/public-inbox-extindex
t/extsearch.t

index 29414e4abe69168fbb81f28cd200282593d2d8cc..495579a2b43bede87578228fc82481879862f55c 100644 (file)
@@ -844,6 +844,98 @@ sub sync_inbox {
        warn $err, "\n" if defined($err);
 }
 
+sub dd_smsg { # git->cat_async callback
+       my ($bref, $oid, $type, $size, $dd) = @_;
+       my $smsg = $dd->{smsg} // die 'BUG: dd->{smsg} missing';
+       my $self = $dd->{self} // die 'BUG: {self} missing';
+       my $per_mid = $dd->{per_mid} // die 'BUG: {per_mid} missing';
+       if ($type eq 'missing') {
+               _blob_missing($dd, $smsg);
+       } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
+               local $self->{current_info} = "$self->{current_info} $oid";
+               my $chash = content_hash(PublicInbox::Eml->new($bref));
+               push(@{$per_mid->{dd_chash}->{$chash}}, $smsg);
+       }
+       return if $per_mid->{last_smsg} != $smsg;
+       while (my ($chash, $ary) = each %{$per_mid->{dd_chash}}) {
+               my $keep = shift @$ary;
+               next if !scalar(@$ary);
+               $per_mid->{sync}->{dedupe_cull} += scalar(@$ary);
+               print STDERR
+                       "# <$keep->{mid}> keeping #$keep->{num}, dropping ",
+                       join(', ', map { "#$_->{num}" } @$ary),"\n";
+               next if $per_mid->{sync}->{-opt}->{'dry-run'};
+               my $oidx = $self->{oidx};
+               for my $smsg (@$ary) {
+                       my $gone = $smsg->{num};
+                       $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob});
+                       $self->idx_shard($gone)->ipc_do('xdb_remove', $gone);
+                       $oidx->delete_by_num($gone);
+               }
+       }
+}
+
+sub eidx_dedupe ($$) {
+       my ($self, $sync) = @_;
+       $sync->{dedupe_cull} = 0;
+       my $candidates = 0;
+       my $nr_mid = 0;
+       return unless eidxq_lock_acquire($self);
+       my $iter;
+       my $min_id = 0;
+       local $sync->{-regen_fmt} = "dedupe %u/".$self->{oidx}->max."\n";
+dedupe_restart:
+       $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT DISTINCT(mid),id FROM msgid WHERE id IN
+(SELECT id FROM id2num WHERE id > ? GROUP BY num HAVING COUNT(num) > 1)
+ORDER BY id
+EOS
+       $iter->execute($min_id);
+       local $SIG{__WARN__} = sub {
+               return if PublicInbox::Eml::warn_ignore(@_);
+               warn @_;
+       };
+       while (my ($mid, $id) = $iter->fetchrow_array) {
+               last if $sync->{quit};
+               $self->{current_info} = "dedupe $mid";
+               ${$sync->{nr}} = $min_id = $id;
+               my ($n, $prv, @smsg);
+               while (my $x = $self->{oidx}->next_by_mid($mid, \$n, \$prv)) {
+                       push @smsg, $x;
+               }
+               next if scalar(@smsg) < 2;
+               my $per_mid = {
+                       dd_chash => {}, # chash => [ary of smsgs]
+                       last_smsg => $smsg[-1],
+                       sync => $sync
+               };
+               $nr_mid++;
+               $candidates += scalar(@smsg) - 1;
+               for my $smsg (@smsg) {
+                       my $dd = {
+                               per_mid => $per_mid,
+                               smsg => $smsg,
+                               self => $self,
+                       };
+                       $self->git->cat_async($smsg->{blob}, \&dd_smsg, $dd);
+               }
+               # need to wait on every single one
+               $self->git->async_wait_all;
+
+               # is checkpoint needed? $iter is a very expensive query to restart
+               if (0 && checkpoint_due($sync)) {
+                       undef $iter;
+                       reindex_checkpoint($self, $sync);
+                       goto dedupe_restart;
+               }
+       }
+       my $n = delete $sync->{dedupe_cull};
+       if (my $pr = $sync->{-opt}->{-progress}) {
+               $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n");
+       }
+       ${$sync->{nr}} = 0;
+}
+
 sub eidx_sync { # main entry point
        my ($self, $opt) = @_;
 
@@ -873,6 +965,10 @@ sub eidx_sync { # main entry point
        for my $ibx (@{$self->{ibx_list}}) {
                $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
        }
+       if (delete($opt->{dedupe})) {
+               local $sync->{checkpoint_unlocks} = 1;
+               eidx_dedupe($self, $sync);
+       }
        if (delete($opt->{reindex})) {
                local $sync->{checkpoint_unlocks} = 1;
                eidx_reindex($self, $sync);
index 5f96a5b099ff0a692b5aa33f9f2994bcaec9e678..8f7cf2bb97228b49f4b653ee94ecfa90d74dc3f4 100644 (file)
@@ -656,6 +656,26 @@ UPDATE over SET ddd = ? WHERE num = ?
        $sth->execute;
 }
 
+sub merge_xref3 { # used for "-extindex --dedupe"
+       my ($self, $keep_docid, $drop_docid, $oidhex) = @_;
+       my $oidbin = pack('H*', $oidhex);
+       my $sth = $self->{dbh}->prepare_cached(<<'');
+UPDATE OR IGNORE xref3 SET docid = ? WHERE docid = ? AND oidbin = ?
+
+       $sth->bind_param(1, $keep_docid);
+       $sth->bind_param(2, $drop_docid);
+       $sth->bind_param(3, $oidbin, SQL_BLOB);
+       $sth->execute;
+
+       # drop anything that conflicted
+       $sth = $self->{dbh}->prepare_cached(<<'');
+DELETE FROM xref3 WHERE docid = ? AND oidbin = ?
+
+       $sth->bind_param(1, $drop_docid);
+       $sth->bind_param(2, $oidbin, SQL_BLOB);
+       $sth->execute;
+}
+
 sub eidxq_add {
        my ($self, $docid) = @_;
        $self->dbh->prepare_cached(<<'')->execute($docid);
index 771486c425b315bae70fd8a82d62ab0331e0a827..dcb12e5ad55490b29d8af5a423259cd4c7752cc6 100755 (executable)
@@ -17,7 +17,9 @@ usage: public-inbox-extindex [options] [EXTINDEX_DIR] [INBOX_DIR...]
   --batch-size=BYTES  flush changes to OS after a given number of bytes
   --max-size=BYTES    do not index messages larger than the given size
   --gc                perform garbage collection instead of indexing
+  --dedupe            fix prior deduplication errors
   --verbose | -v      increase verbosity (may be repeated)
+  --dry-run | -n      dry-run on --dedupe
 
 BYTES may use `k', `m', and `g' suffixes (e.g. `10m' for 10 megabytes)
 See public-inbox-extindex(1) man page for full documentation.
@@ -27,7 +29,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i
                fsync|sync!
                indexlevel|index-level|L=s max_size|max-size=s
                batch_size|batch-size=s
-               gc commit-interval=i watch scan!
+               dedupe gc commit-interval=i watch scan! dry-run|n
                all help|h))
        or die $help;
 if ($opt->{help}) { print $help; exit 0 };
@@ -50,11 +52,16 @@ unless (defined $eidx_dir) {
 my @ibxs;
 if ($opt->{gc}) {
        die "E: inbox paths must not be specified with --gc\n" if @ARGV;
-       die "E: --all not compatible with --gc\n" if $opt->{all};
-       die "E: --watch is not compatible with --gc\n" if $opt->{watch};
+       for my $sw (qw(all watch dry-run dedupe)) {
+               die "E: --$sw is not compatible with --gc\n" if $opt->{$sw};
+       }
 } else {
        @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt, $cfg);
 }
+if ($opt->{'dry-run'} && !$opt->{dedupe}) {
+       die "E: --dry-run only affects --dedupe\n";
+}
+
 PublicInbox::Admin::require_or_die(qw(-search));
 PublicInbox::Config::json() or die "Cpanel::JSON::XS or similar missing\n";
 PublicInbox::Admin::progress_prepare($opt);
index ae889ac67837969c214c98d8fdc5652460308f0f..5f0cd8662bfc0a102bbc32223e4c42a0fa199a30 100644 (file)
@@ -368,4 +368,15 @@ if ('remove v1test and test gc') {
        is(scalar(@it), 1, 'only one inbox left');
 }
 
+if ('dedupe + dry-run') {
+       my @cmd = ('-extindex', "$home/extindex");
+       my $opt = { 2 => \(my $err = '') };
+       ok(run_script([@cmd, '--dedupe'], undef, $opt), '--dedupe');
+       ok(run_script([@cmd, qw(--dedupe --dry-run)], undef, $opt),
+               '--dry-run --dedupe');
+       is $err, '', 'no errors';
+       ok(!run_script([@cmd, qw(--dry-run)], undef, $opt),
+               '--dry-run alone fails');
+}
+
 done_testing;