+ $err;
+}
+
+sub dd_smsg { # git->cat_async callback
+ my ($bref, $oid, $type, $size, $dd) = @_;
+ my $smsg = $dd->{smsg} // die 'BUG: dd->{smsg} missing';
+ my $self = $dd->{self} // die 'BUG: {self} missing';
+ my $per_mid = $dd->{per_mid} // die 'BUG: {per_mid} missing';
+ if ($type eq 'missing') {
+ _blob_missing($dd, $smsg);
+ } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
+ local $self->{current_info} = "$self->{current_info} $oid";
+ my $chash = content_hash(PublicInbox::Eml->new($bref));
+ push(@{$per_mid->{dd_chash}->{$chash}}, $smsg);
+ }
+ return if $per_mid->{last_smsg} != $smsg;
+ while (my ($chash, $ary) = each %{$per_mid->{dd_chash}}) {
+ my $keep = shift @$ary;
+ next if !scalar(@$ary);
+ $per_mid->{sync}->{dedupe_cull} += scalar(@$ary);
+ print STDERR
+ "# <$keep->{mid}> keeping #$keep->{num}, dropping ",
+ join(', ', map { "#$_->{num}" } @$ary),"\n";
+ next if $per_mid->{sync}->{-opt}->{'dry-run'};
+ my $oidx = $self->{oidx};
+ for my $smsg (@$ary) {
+ my $gone = $smsg->{num};
+ $oidx->merge_xref3($keep->{num}, $gone, $smsg->oidbin);
+ remove_doc($self, $gone);
+ }
+ }
+}
+
+sub eidx_dedupe ($$$) {
+ my ($self, $sync, $msgids) = @_;
+ $sync->{dedupe_cull} = 0;
+ my $candidates = 0;
+ my $nr_mid = 0;
+ return unless eidxq_lock_acquire($self);
+ my ($iter, $cur_mid);
+ my $min_id = 0;
+ my $idx = 0;
+ my ($max_id) = $self->{oidx}->dbh->selectrow_array(<<EOS);
+SELECT MAX(id) FROM msgid
+EOS
+ local $sync->{-regen_fmt} = "dedupe %u/$max_id\n";
+
+ # note: we could write this query more intelligently,
+ # but that causes lock contention with read-only processes
+dedupe_restart:
+ $cur_mid = $msgids->[$idx];
+ if ($cur_mid eq '') { # all Message-IDs
+ $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT mid,id FROM msgid WHERE id > ? ORDER BY id ASC
+EOS
+ $iter->execute($min_id);
+ } else {
+ $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT mid,id FROM msgid WHERE mid = ? AND id > ? ORDER BY id ASC
+EOS
+ $iter->execute($cur_mid, $min_id);
+ }
+ while (my ($mid, $id) = $iter->fetchrow_array) {
+ last if $sync->{quit};
+ $self->{current_info} = "dedupe $mid";
+ ${$sync->{nr}} = $min_id = $id;
+ my ($prv, @smsg);
+ while (my $x = $self->{oidx}->next_by_mid($mid, \$id, \$prv)) {
+ push @smsg, $x;
+ }
+ next if scalar(@smsg) < 2;
+ my $per_mid = {
+ dd_chash => {}, # chash => [ary of smsgs]
+ last_smsg => $smsg[-1],
+ sync => $sync
+ };
+ $nr_mid++;
+ $candidates += scalar(@smsg) - 1;
+ for my $smsg (@smsg) {
+ my $dd = {
+ per_mid => $per_mid,
+ smsg => $smsg,
+ self => $self,
+ };
+ $self->git->cat_async($smsg->{blob}, \&dd_smsg, $dd);
+ }
+ # need to wait on every single one @smsg contents can get
+ # invalidated inside dd_smsg for messages with multiple
+ # Message-IDs.
+ $self->git->async_wait_all;
+
+ if (checkpoint_due($sync)) {
+ undef $iter;
+ reindex_checkpoint($self, $sync);
+ goto dedupe_restart;
+ }
+ }
+ goto dedupe_restart if defined($msgids->[++$idx]);
+
+ my $n = delete $sync->{dedupe_cull};
+ if (my $pr = $sync->{-opt}->{-progress}) {
+ $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n");
+ }
+ ${$sync->{nr}} = 0;