]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/ExtSearchIdx.pm
extsearchidx: ignore Eml warnings across the board
[public-inbox.git] / lib / PublicInbox / ExtSearchIdx.pm
index ee43e6f8f79f7e4de10b33a061c3dfc6eaf0f8ac..a421e16bb91a1899cbd4ff1b6bef2f7f86b2b3e5 100644 (file)
@@ -139,7 +139,7 @@ sub index_unseen ($) {
 
 sub do_finalize ($) {
        my ($req) = @_;
-       if (my $indexed = $req->{indexed}) {
+       if (my $indexed = $req->{indexed}) { # duplicated messages
                do_xpost($req, $_) for @$indexed;
        } elsif (exists $req->{new_smsg}) { # totally unseen messsage
                index_unseen($req);
@@ -164,11 +164,10 @@ sub do_step ($) { # main iterator for adding messages to the index
                                                        \&ck_existing, $req);
                                return; # ck_existing calls do_step
                        }
-                       delete $req->{cur_smsg};
                        delete $req->{next_arg};
                }
-               my $mid = shift(@{$req->{mids}});
-               last unless defined $mid;
+               die "BUG: {cur_smsg} still set" if $req->{cur_smsg};
+               my $mid = shift(@{$req->{mids}}) // last;
                my ($id, $prev);
                $req->{next_arg} = [ $mid, \$id, \$prev ];
                # loop again
@@ -176,9 +175,8 @@ sub do_step ($) { # main iterator for adding messages to the index
        do_finalize($req);
 }
 
-sub _blob_missing ($) { # called when req->{cur_smsg}->{blob} is bad
-       my ($req) = @_;
-       my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing';
+sub _blob_missing ($$) { # called when $smsg->{blob} is bad
+       my ($req, $smsg) = @_;
        my $self = $req->{self};
        my $xref3 = $self->{oidx}->get_xref3($smsg->{num});
        my @keep = grep(!/:$smsg->{blob}\z/, @$xref3);
@@ -196,9 +194,9 @@ sub _blob_missing ($) { # called when req->{cur_smsg}->{blob} is bad
 
 sub ck_existing { # git->cat_async callback
        my ($bref, $oid, $type, $size, $req) = @_;
-       my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing';
+       my $smsg = delete $req->{cur_smsg} or die 'BUG: {cur_smsg} missing';
        if ($type eq 'missing') {
-               _blob_missing($req);
+               _blob_missing($req, $smsg);
        } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
                my $self = $req->{self} // die 'BUG: {self} missing';
                local $self->{current_info} = "$self->{current_info} $oid";
@@ -219,8 +217,7 @@ sub cur_ibx_xnum ($$) {
        $req->{eml} = PublicInbox::Eml->new($bref);
        $req->{chash} = content_hash($req->{eml});
        $req->{mids} = mids($req->{eml});
-       my @q = @{$req->{mids}}; # copy
-       while (defined(my $mid = shift @q)) {
+       for my $mid (@{$req->{mids}}) {
                my ($id, $prev);
                while (my $x = $ibx->over->next_by_mid($mid, \$id, \$prev)) {
                        return $x->{num} if $x->{blob} eq $req->{oid};
@@ -847,12 +844,101 @@ sub sync_inbox {
        warn $err, "\n" if defined($err);
 }
 
+sub dd_smsg { # git->cat_async callback
+       my ($bref, $oid, $type, $size, $dd) = @_;
+       my $smsg = $dd->{smsg} // die 'BUG: dd->{smsg} missing';
+       my $self = $dd->{self} // die 'BUG: {self} missing';
+       my $per_mid = $dd->{per_mid} // die 'BUG: {per_mid} missing';
+       if ($type eq 'missing') {
+               _blob_missing($dd, $smsg);
+       } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
+               local $self->{current_info} = "$self->{current_info} $oid";
+               my $chash = content_hash(PublicInbox::Eml->new($bref));
+               push(@{$per_mid->{dd_chash}->{$chash}}, $smsg);
+       }
+       return if $per_mid->{last_smsg} != $smsg;
+       while (my ($chash, $ary) = each %{$per_mid->{dd_chash}}) {
+               my $keep = shift @$ary;
+               next if !scalar(@$ary);
+               $per_mid->{sync}->{dedupe_cull} += scalar(@$ary);
+               print STDERR
+                       "# <$keep->{mid}> keeping #$keep->{num}, dropping ",
+                       join(', ', map { "#$_->{num}" } @$ary),"\n";
+               next if $per_mid->{sync}->{-opt}->{'dry-run'};
+               my $oidx = $self->{oidx};
+               for my $smsg (@$ary) {
+                       my $gone = $smsg->{num};
+                       $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob});
+                       $self->idx_shard($gone)->ipc_do('xdb_remove', $gone);
+                       $oidx->delete_by_num($gone);
+               }
+       }
+}
+
+sub eidx_dedupe ($$) {
+       my ($self, $sync) = @_;
+       $sync->{dedupe_cull} = 0;
+       my $candidates = 0;
+       my $nr_mid = 0;
+       return unless eidxq_lock_acquire($self);
+       my $iter;
+       my $min_id = 0;
+       local $sync->{-regen_fmt} = "dedupe %u/".$self->{oidx}->max."\n";
+dedupe_restart:
+       $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT DISTINCT(mid),id FROM msgid WHERE id IN
+(SELECT id FROM id2num WHERE id > ? GROUP BY num HAVING COUNT(num) > 1)
+ORDER BY id
+EOS
+       $iter->execute($min_id);
+       while (my ($mid, $id) = $iter->fetchrow_array) {
+               last if $sync->{quit};
+               $self->{current_info} = "dedupe $mid";
+               ${$sync->{nr}} = $min_id = $id;
+               my ($n, $prv, @smsg);
+               while (my $x = $self->{oidx}->next_by_mid($mid, \$n, \$prv)) {
+                       push @smsg, $x;
+               }
+               next if scalar(@smsg) < 2;
+               my $per_mid = {
+                       dd_chash => {}, # chash => [ary of smsgs]
+                       last_smsg => $smsg[-1],
+                       sync => $sync
+               };
+               $nr_mid++;
+               $candidates += scalar(@smsg) - 1;
+               for my $smsg (@smsg) {
+                       my $dd = {
+                               per_mid => $per_mid,
+                               smsg => $smsg,
+                               self => $self,
+                       };
+                       $self->git->cat_async($smsg->{blob}, \&dd_smsg, $dd);
+               }
+               # need to wait on every single one
+               $self->git->async_wait_all;
+
+               # is checkpoint needed? $iter is a very expensive query to restart
+               if (0 && checkpoint_due($sync)) {
+                       undef $iter;
+                       reindex_checkpoint($self, $sync);
+                       goto dedupe_restart;
+               }
+       }
+       my $n = delete $sync->{dedupe_cull};
+       if (my $pr = $sync->{-opt}->{-progress}) {
+               $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n");
+       }
+       ${$sync->{nr}} = 0;
+}
+
 sub eidx_sync { # main entry point
        my ($self, $opt) = @_;
 
        my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
        local $self->{current_info} = '';
        local $SIG{__WARN__} = sub {
+               return if PublicInbox::Eml::warn_ignore(@_);
                $warn_cb->($self->{current_info}, ': ', @_);
        };
        $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
@@ -876,6 +962,10 @@ sub eidx_sync { # main entry point
        for my $ibx (@{$self->{ibx_list}}) {
                $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
        }
+       if (delete($opt->{dedupe})) {
+               local $sync->{checkpoint_unlocks} = 1;
+               eidx_dedupe($self, $sync);
+       }
        if (delete($opt->{reindex})) {
                local $sync->{checkpoint_unlocks} = 1;
                eidx_reindex($self, $sync);
@@ -1145,7 +1235,10 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
        my $oldset = PublicInbox::DS::block_signals();
        local $self->{current_info} = '';
        my $cb = $SIG{__WARN__} || \&CORE::warn;
-       local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+       local $SIG{__WARN__} = sub {
+               return if PublicInbox::Eml::warn_ignore(@_);
+               $cb->($self->{current_info}, ': ', @_);
+       };
        my $sig = {
                HUP => sub { eidx_reload($self, $idler) },
                USR1 => sub { eidx_resync_start($self) },