warn $err, "\n" if defined($err);
}
+sub dd_smsg { # git->cat_async callback
+ my ($bref, $oid, $type, $size, $dd) = @_;
+ my $smsg = $dd->{smsg} // die 'BUG: dd->{smsg} missing';
+ my $self = $dd->{self} // die 'BUG: {self} missing';
+ my $per_mid = $dd->{per_mid} // die 'BUG: {per_mid} missing';
+ if ($type eq 'missing') {
+ _blob_missing($dd, $smsg);
+ } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
+ local $self->{current_info} = "$self->{current_info} $oid";
+ my $chash = content_hash(PublicInbox::Eml->new($bref));
+ push(@{$per_mid->{dd_chash}->{$chash}}, $smsg);
+ }
+ return if $per_mid->{last_smsg} != $smsg;
+ while (my ($chash, $ary) = each %{$per_mid->{dd_chash}}) {
+ my $keep = shift @$ary;
+ next if !scalar(@$ary);
+ $per_mid->{sync}->{dedupe_cull} += scalar(@$ary);
+ print STDERR
+ "# <$keep->{mid}> keeping #$keep->{num}, dropping ",
+ join(', ', map { "#$_->{num}" } @$ary),"\n";
+ next if $per_mid->{sync}->{-opt}->{'dry-run'};
+ my $oidx = $self->{oidx};
+ for my $smsg (@$ary) {
+ my $gone = $smsg->{num};
+ $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob});
+ $self->idx_shard($gone)->ipc_do('xdb_remove', $gone);
+ $oidx->delete_by_num($gone);
+ }
+ }
+}
+
+sub eidx_dedupe ($$) {
+ my ($self, $sync) = @_;
+ $sync->{dedupe_cull} = 0;
+ my $candidates = 0;
+ my $nr_mid = 0;
+ return unless eidxq_lock_acquire($self);
+ my $iter;
+ my $min_id = 0;
+ local $sync->{-regen_fmt} = "dedupe %u/".$self->{oidx}->max."\n";
+dedupe_restart:
+ $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT DISTINCT(mid),id FROM msgid WHERE id IN
+(SELECT id FROM id2num WHERE id > ? GROUP BY num HAVING COUNT(num) > 1)
+ORDER BY id
+EOS
+ $iter->execute($min_id);
+ local $SIG{__WARN__} = sub {
+ return if PublicInbox::Eml::warn_ignore(@_);
+ warn @_;
+ };
+ while (my ($mid, $id) = $iter->fetchrow_array) {
+ last if $sync->{quit};
+ $self->{current_info} = "dedupe $mid";
+ ${$sync->{nr}} = $min_id = $id;
+ my ($n, $prv, @smsg);
+ while (my $x = $self->{oidx}->next_by_mid($mid, \$n, \$prv)) {
+ push @smsg, $x;
+ }
+ next if scalar(@smsg) < 2;
+ my $per_mid = {
+ dd_chash => {}, # chash => [ary of smsgs]
+ last_smsg => $smsg[-1],
+ sync => $sync
+ };
+ $nr_mid++;
+ $candidates += scalar(@smsg) - 1;
+ for my $smsg (@smsg) {
+ my $dd = {
+ per_mid => $per_mid,
+ smsg => $smsg,
+ self => $self,
+ };
+ $self->git->cat_async($smsg->{blob}, \&dd_smsg, $dd);
+ }
+ # need to wait on every single one
+ $self->git->async_wait_all;
+
+ # is checkpoint needed? $iter is a very expensive query to restart
+ if (0 && checkpoint_due($sync)) {
+ undef $iter;
+ reindex_checkpoint($self, $sync);
+ goto dedupe_restart;
+ }
+ }
+ my $n = delete $sync->{dedupe_cull};
+ if (my $pr = $sync->{-opt}->{-progress}) {
+ $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n");
+ }
+ ${$sync->{nr}} = 0;
+}
+
sub eidx_sync { # main entry point
my ($self, $opt) = @_;
for my $ibx (@{$self->{ibx_list}}) {
$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
}
+ if (delete($opt->{dedupe})) {
+ local $sync->{checkpoint_unlocks} = 1;
+ eidx_dedupe($self, $sync);
+ }
if (delete($opt->{reindex})) {
local $sync->{checkpoint_unlocks} = 1;
eidx_reindex($self, $sync);
$sth->execute;
}
+sub merge_xref3 { # used for "-extindex --dedupe"
+ my ($self, $keep_docid, $drop_docid, $oidhex) = @_;
+ my $oidbin = pack('H*', $oidhex);
+ my $sth = $self->{dbh}->prepare_cached(<<'');
+UPDATE OR IGNORE xref3 SET docid = ? WHERE docid = ? AND oidbin = ?
+
+ $sth->bind_param(1, $keep_docid);
+ $sth->bind_param(2, $drop_docid);
+ $sth->bind_param(3, $oidbin, SQL_BLOB);
+ $sth->execute;
+
+ # drop anything that conflicted
+ $sth = $self->{dbh}->prepare_cached(<<'');
+DELETE FROM xref3 WHERE docid = ? AND oidbin = ?
+
+ $sth->bind_param(1, $drop_docid);
+ $sth->bind_param(2, $oidbin, SQL_BLOB);
+ $sth->execute;
+}
+
sub eidxq_add {
my ($self, $docid) = @_;
$self->dbh->prepare_cached(<<'')->execute($docid);
--batch-size=BYTES flush changes to OS after a given number of bytes
--max-size=BYTES do not index messages larger than the given size
--gc perform garbage collection instead of indexing
+ --dedupe fix prior deduplication errors
--verbose | -v increase verbosity (may be repeated)
+ --dry-run | -n dry-run on --dedupe
BYTES may use `k', `m', and `g' suffixes (e.g. `10m' for 10 megabytes)
See public-inbox-extindex(1) man page for full documentation.
fsync|sync!
indexlevel|index-level|L=s max_size|max-size=s
batch_size|batch-size=s
- gc commit-interval=i watch scan!
+ dedupe gc commit-interval=i watch scan! dry-run|n
all help|h))
or die $help;
if ($opt->{help}) { print $help; exit 0 };
my @ibxs;
if ($opt->{gc}) {
die "E: inbox paths must not be specified with --gc\n" if @ARGV;
- die "E: --all not compatible with --gc\n" if $opt->{all};
- die "E: --watch is not compatible with --gc\n" if $opt->{watch};
+ for my $sw (qw(all watch dry-run dedupe)) {
+ die "E: --$sw is not compatible with --gc\n" if $opt->{$sw};
+ }
} else {
@ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt, $cfg);
}
+if ($opt->{'dry-run'} && !$opt->{dedupe}) {
+ die "E: --dry-run only affects --dedupe\n";
+}
+
PublicInbox::Admin::require_or_die(qw(-search));
PublicInbox::Config::json() or die "Cpanel::JSON::XS or similar missing\n";
PublicInbox::Admin::progress_prepare($opt);
is(scalar(@it), 1, 'only one inbox left');
}
+if ('dedupe + dry-run') {
+ my @cmd = ('-extindex', "$home/extindex");
+ my $opt = { 2 => \(my $err = '') };
+ ok(run_script([@cmd, '--dedupe'], undef, $opt), '--dedupe');
+ ok(run_script([@cmd, qw(--dedupe --dry-run)], undef, $opt),
+ '--dry-run --dedupe');
+ is $err, '', 'no errors';
+ ok(!run_script([@cmd, qw(--dry-run)], undef, $opt),
+ '--dry-run alone fails');
+}
+
done_testing;