]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/ExtSearchIdx.pm
extsearchidx: ignore Eml warnings across the board
[public-inbox.git] / lib / PublicInbox / ExtSearchIdx.pm
index a2d70205c9cdb2a7faac5b977ecd3246de9974e2..a421e16bb91a1899cbd4ff1b6bef2f7f86b2b3e5 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Detached/external index cross inbox search indexing support
@@ -20,9 +20,9 @@ use parent qw(PublicInbox::ExtSearch PublicInbox::Lock);
 use Carp qw(croak carp);
 use Sys::Hostname qw(hostname);
 use POSIX qw(strftime);
+use File::Glob qw(bsd_glob GLOB_NOSORT);
 use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor
-       is_bad_blob);
+use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob);
 use PublicInbox::OverIdx;
 use PublicInbox::MiscIdx;
 use PublicInbox::MID qw(mids);
@@ -30,7 +30,7 @@ use PublicInbox::V2Writable;
 use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
-use PublicInbox::DS qw(now);
+use PublicInbox::DS qw(now add_timer);
 use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
@@ -82,8 +82,6 @@ sub check_batch_limit ($) {
        my ($req) = @_;
        my $self = $req->{self};
        my $new_smsg = $req->{new_smsg};
-
-       # {raw_bytes} may be unset, so just use {bytes}
        my $n = $self->{transact_bytes} += $new_smsg->{bytes};
 
        # set flag for PublicInbox::V2Writable::index_todo:
@@ -102,7 +100,7 @@ sub do_xpost ($$) {
        if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message
                my $xnum = $req->{xnum};
                $self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key);
-               $idx->shard_add_eidx_info($docid, $eidx_key, $eml);
+               $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
                check_batch_limit($req);
        } else { # 'd'
                my $rm_eidx_info;
@@ -110,9 +108,10 @@ sub do_xpost ($$) {
                                                        \$rm_eidx_info);
                if ($nr == 0) {
                        $self->{oidx}->eidxq_del($docid);
-                       $idx->shard_remove($docid);
+                       $idx->ipc_do('xdb_remove', $docid);
                } elsif ($rm_eidx_info) {
-                       $idx->shard_remove_eidx_info($docid, $eidx_key, $eml);
+                       $idx->ipc_do('remove_eidx_info',
+                                       $docid, $eidx_key, $eml);
                        $self->{oidx}->eidxq_add($docid); # yes, add
                }
        }
@@ -134,13 +133,13 @@ sub index_unseen ($) {
        my $oid = $new_smsg->{blob};
        my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
        $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
-       $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
+       $idx->index_eml($eml, $new_smsg, $ibx->eidx_key);
        check_batch_limit($req);
 }
 
 sub do_finalize ($) {
        my ($req) = @_;
-       if (my $indexed = $req->{indexed}) {
+       if (my $indexed = $req->{indexed}) { # duplicated messages
                do_xpost($req, $_) for @$indexed;
        } elsif (exists $req->{new_smsg}) { # totally unseen messsage
                index_unseen($req);
@@ -165,11 +164,10 @@ sub do_step ($) { # main iterator for adding messages to the index
                                                        \&ck_existing, $req);
                                return; # ck_existing calls do_step
                        }
-                       delete $req->{cur_smsg};
                        delete $req->{next_arg};
                }
-               my $mid = shift(@{$req->{mids}});
-               last unless defined $mid;
+               die "BUG: {cur_smsg} still set" if $req->{cur_smsg};
+               my $mid = shift(@{$req->{mids}}) // last;
                my ($id, $prev);
                $req->{next_arg} = [ $mid, \$id, \$prev ];
                # loop again
@@ -177,9 +175,8 @@ sub do_step ($) { # main iterator for adding messages to the index
        do_finalize($req);
 }
 
-sub _blob_missing ($) { # called when req->{cur_smsg}->{blob} is bad
-       my ($req) = @_;
-       my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing';
+sub _blob_missing ($$) { # called when $smsg->{blob} is bad
+       my ($req, $smsg) = @_;
        my $self = $req->{self};
        my $xref3 = $self->{oidx}->get_xref3($smsg->{num});
        my @keep = grep(!/:$smsg->{blob}\z/, @$xref3);
@@ -197,9 +194,9 @@ sub _blob_missing ($) { # called when req->{cur_smsg}->{blob} is bad
 
 sub ck_existing { # git->cat_async callback
        my ($bref, $oid, $type, $size, $req) = @_;
-       my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing';
+       my $smsg = delete $req->{cur_smsg} or die 'BUG: {cur_smsg} missing';
        if ($type eq 'missing') {
-               _blob_missing($req);
+               _blob_missing($req, $smsg);
        } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
                my $self = $req->{self} // die 'BUG: {self} missing';
                local $self->{current_info} = "$self->{current_info} $oid";
@@ -220,8 +217,7 @@ sub cur_ibx_xnum ($$) {
        $req->{eml} = PublicInbox::Eml->new($bref);
        $req->{chash} = content_hash($req->{eml});
        $req->{mids} = mids($req->{eml});
-       my @q = @{$req->{mids}}; # copy
-       while (defined(my $mid = shift @q)) {
+       for my $mid (@{$req->{mids}}) {
                my ($id, $prev);
                while (my $x = $ibx->over->next_by_mid($mid, \$id, \$prev)) {
                        return $x->{num} if $x->{blob} eq $req->{oid};
@@ -238,7 +234,7 @@ sub index_oid { # git->cat_async callback for 'm'
        my $new_smsg = $req->{new_smsg} = bless {
                blob => $oid,
        }, 'PublicInbox::Smsg';
-       $new_smsg->{bytes} = $size + crlf_adjust($$bref);
+       $new_smsg->set_bytes($$bref, $size);
        defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return;
        ++${$req->{nr}};
        do_step($req);
@@ -327,7 +323,7 @@ DELETE FROM xref3 WHERE docid = ? AND ibx_id = ?
                }
        } else {
                warn "I: remove #$docid $eidx_key @oid\n";
-               $self->idx_shard($docid)->shard_remove($docid);
+               $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
        }
 }
 
@@ -338,9 +334,11 @@ sub eidx_gc {
        $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
 
        my $dbh = $self->{oidx}->dbh;
+       $dbh->do('PRAGMA case_sensitive_like = ON'); # only place we use LIKE
        my $x3_doc = $dbh->prepare('SELECT docid FROM xref3 WHERE ibx_id = ?');
        my $ibx_ck = $dbh->prepare('SELECT ibx_id,eidx_key FROM inboxes');
-       my $lc_i = $dbh->prepare('SELECT key FROM eidx_meta WHERE key LIKE ?');
+       my $lc_i = $dbh->prepare(<<'');
+SELECT key FROM eidx_meta WHERE key LIKE ? ESCAPE ?
 
        $ibx_ck->execute;
        while (my ($ibx_id, $eidx_key) = $ibx_ck->fetchrow_array) {
@@ -356,8 +354,8 @@ DELETE FROM inboxes WHERE ibx_id = ?
 
                # drop last_commit info
                my $pat = $eidx_key;
-               $pat =~ s/([_%])/\\$1/g;
-               $lc_i->execute("lc-%:$pat//%");
+               $pat =~ s/([_%\\])/\\$1/g;
+               $lc_i->execute("lc-%:$pat//%", '\\');
                while (my ($key) = $lc_i->fetchrow_array) {
                        next if $key !~ m!\Alc-v[1-9]+:\Q$eidx_key\E//!;
                        warn "I: removing $key\n";
@@ -436,11 +434,11 @@ sub _reindex_finalize ($$$) {
        my $top_smsg = pop @$stable;
        $top_smsg == $smsg or die 'BUG: top_smsg != smsg';
        my $ibx = _ibx_for($self, $sync, $smsg);
-       $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+       $idx->index_eml($eml, $smsg, $ibx->eidx_key);
        for my $x (reverse @$stable) {
                $ibx = _ibx_for($self, $sync, $x);
                my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
-               $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
+               $idx->ipc_do('add_eidx_info', $docid, $ibx->eidx_key, $hdr);
        }
        return if $nr == 1; # likely, all good
 
@@ -483,19 +481,19 @@ sub _reindex_oid { # git->cat_async callback
                my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
                if ($remain == 0) {
                        warn "W: #$docid gone or corrupted\n";
-                       $self->idx_shard($docid)->shard_remove($docid);
+                       $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
                } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                        $self->git->cat_async($next_oid, \&_reindex_oid, $req);
                } else {
                        warn "BUG: #$docid gone (UNEXPECTED)\n";
-                       $self->idx_shard($docid)->shard_remove($docid);
+                       $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
                }
                return;
        }
        my $ci = $self->{current_info};
        local $self->{current_info} = "$ci #$docid $oid";
        my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
-       $re_smsg->{bytes} = $size + crlf_adjust($$bref);
+       $re_smsg->set_bytes($$bref, $size);
        my $eml = PublicInbox::Eml->new($bref);
        $re_smsg->populate($eml, { autime => $orig_smsg->{ds},
                                cotime => $orig_smsg->{ts} });
@@ -522,7 +520,7 @@ sub _reindex_smsg ($$$) {
 BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
 
                $self->{oidx}->delete_by_num($docid);
-               $self->idx_shard($docid)->shard_remove($docid);
+               $self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
                return;
        }
 
@@ -675,7 +673,7 @@ sub _reindex_unseen { # git->cat_async callback
        my $self = $req->{self} // die 'BUG: {self} unset';
        local $self->{current_info} = "$self->{current_info} $oid";
        my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg';
-       $new_smsg->{bytes} = $size + crlf_adjust($$bref);
+       $new_smsg->set_bytes($$bref, $size);
        my $eml = $req->{eml} = PublicInbox::Eml->new($bref);
        $req->{new_smsg} = $new_smsg;
        $req->{chash} = content_hash($eml);
@@ -799,10 +797,10 @@ DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
                        if (scalar(@$xr3) == 0) { # all gone
                                $self->{oidx}->delete_by_num($docid);
                                $self->{oidx}->eidxq_del($docid);
-                               $idx->shard_remove($docid);
+                               $idx->ipc_do('xdb_remove', $docid);
                        } else { # enqueue for reindex of remaining messages
-                               $idx->shard_remove_eidx_info($docid,
-                                                       $ibx->eidx_key);
+                               $idx->ipc_do('remove_eidx_info',
+                                               $docid, $ibx->eidx_key);
                                $self->{oidx}->eidxq_add($docid); # yes, add
                        }
                }
@@ -846,12 +844,101 @@ sub sync_inbox {
        warn $err, "\n" if defined($err);
 }
 
+sub dd_smsg { # git->cat_async callback
+       my ($bref, $oid, $type, $size, $dd) = @_;
+       my $smsg = $dd->{smsg} // die 'BUG: dd->{smsg} missing';
+       my $self = $dd->{self} // die 'BUG: {self} missing';
+       my $per_mid = $dd->{per_mid} // die 'BUG: {per_mid} missing';
+       if ($type eq 'missing') {
+               _blob_missing($dd, $smsg);
+       } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) {
+               local $self->{current_info} = "$self->{current_info} $oid";
+               my $chash = content_hash(PublicInbox::Eml->new($bref));
+               push(@{$per_mid->{dd_chash}->{$chash}}, $smsg);
+       }
+       return if $per_mid->{last_smsg} != $smsg;
+       while (my ($chash, $ary) = each %{$per_mid->{dd_chash}}) {
+               my $keep = shift @$ary;
+               next if !scalar(@$ary);
+               $per_mid->{sync}->{dedupe_cull} += scalar(@$ary);
+               print STDERR
+                       "# <$keep->{mid}> keeping #$keep->{num}, dropping ",
+                       join(', ', map { "#$_->{num}" } @$ary),"\n";
+               next if $per_mid->{sync}->{-opt}->{'dry-run'};
+               my $oidx = $self->{oidx};
+               for my $smsg (@$ary) {
+                       my $gone = $smsg->{num};
+                       $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob});
+                       $self->idx_shard($gone)->ipc_do('xdb_remove', $gone);
+                       $oidx->delete_by_num($gone);
+               }
+       }
+}
+
+sub eidx_dedupe ($$) {
+       my ($self, $sync) = @_;
+       $sync->{dedupe_cull} = 0;
+       my $candidates = 0;
+       my $nr_mid = 0;
+       return unless eidxq_lock_acquire($self);
+       my $iter;
+       my $min_id = 0;
+       local $sync->{-regen_fmt} = "dedupe %u/".$self->{oidx}->max."\n";
+dedupe_restart:
+       $iter = $self->{oidx}->dbh->prepare(<<EOS);
+SELECT DISTINCT(mid),id FROM msgid WHERE id IN
+(SELECT id FROM id2num WHERE id > ? GROUP BY num HAVING COUNT(num) > 1)
+ORDER BY id
+EOS
+       $iter->execute($min_id);
+       while (my ($mid, $id) = $iter->fetchrow_array) {
+               last if $sync->{quit};
+               $self->{current_info} = "dedupe $mid";
+               ${$sync->{nr}} = $min_id = $id;
+               my ($n, $prv, @smsg);
+               while (my $x = $self->{oidx}->next_by_mid($mid, \$n, \$prv)) {
+                       push @smsg, $x;
+               }
+               next if scalar(@smsg) < 2;
+               my $per_mid = {
+                       dd_chash => {}, # chash => [ary of smsgs]
+                       last_smsg => $smsg[-1],
+                       sync => $sync
+               };
+               $nr_mid++;
+               $candidates += scalar(@smsg) - 1;
+               for my $smsg (@smsg) {
+                       my $dd = {
+                               per_mid => $per_mid,
+                               smsg => $smsg,
+                               self => $self,
+                       };
+                       $self->git->cat_async($smsg->{blob}, \&dd_smsg, $dd);
+               }
+               # need to wait on every single one
+               $self->git->async_wait_all;
+
+               # is checkpoint needed? $iter is a very expensive query to restart
+               if (0 && checkpoint_due($sync)) {
+                       undef $iter;
+                       reindex_checkpoint($self, $sync);
+                       goto dedupe_restart;
+               }
+       }
+       my $n = delete $sync->{dedupe_cull};
+       if (my $pr = $sync->{-opt}->{-progress}) {
+               $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n");
+       }
+       ${$sync->{nr}} = 0;
+}
+
 sub eidx_sync { # main entry point
        my ($self, $opt) = @_;
 
        my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
        local $self->{current_info} = '';
        local $SIG{__WARN__} = sub {
+               return if PublicInbox::Eml::warn_ignore(@_);
                $warn_cb->($self->{current_info}, ': ', @_);
        };
        $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
@@ -875,6 +962,10 @@ sub eidx_sync { # main entry point
        for my $ibx (@{$self->{ibx_list}}) {
                $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
        }
+       if (delete($opt->{dedupe})) {
+               local $sync->{checkpoint_unlocks} = 1;
+               eidx_dedupe($self, $sync);
+       }
        if (delete($opt->{reindex})) {
                local $sync->{checkpoint_unlocks} = 1;
                eidx_reindex($self, $sync);
@@ -926,10 +1017,35 @@ sub update_last_commit { # overrides V2Writable
 
 sub _idx_init { # with_umask callback
        my ($self, $opt) = @_;
-       PublicInbox::V2Writable::_idx_init($self, $opt);
+       PublicInbox::V2Writable::_idx_init($self, $opt); # acquires ei.lock
        $self->{midx} = PublicInbox::MiscIdx->new($self);
 }
 
+sub symlink_packs ($$) {
+       my ($ibx, $pd) = @_;
+       my $ret = 0;
+       my $glob = "$ibx->{inboxdir}/git/*.git/objects/pack/*.idx";
+       for my $idx (bsd_glob($glob, GLOB_NOSORT)) {
+               my $src = substr($idx, 0, -length('.idx'));
+               my $dst = $pd . substr($src, rindex($src, '/'));
+               if (-f "$src.pack" and
+                               symlink("$src.pack", "$dst.pack") and
+                               symlink($idx, "$dst.idx") and
+                               -f $idx) {
+                       ++$ret;
+                       # .promisor, .bitmap, .rev and .keep are optional
+                       # XXX should we symlink .keep here?
+                       for my $s (qw(promisor bitmap rev)) {
+                               symlink("$src.$s", "$dst.$s") if -f "$src.$s";
+                       }
+               } elsif (!$!{EEXIST}) {
+                       warn "W: ln -s $src.{pack,idx} => $dst.*: $!\n";
+                       unlink "$dst.pack", "$dst.idx";
+               }
+       }
+       $ret;
+}
+
 sub idx_init { # similar to V2Writable
        my ($self, $opt) = @_;
        return if $self->{idx_shards};
@@ -985,7 +1101,24 @@ sub idx_init { # similar to V2Writable
                        }
                }
        }
+       # git-multi-pack-index(1) can speed up "git cat-file" startup slightly
+       my $dh;
+       my $git_midx = 0;
+       my $pd = "$ALL/objects/pack";
+       if (!mkdir($pd) && $!{EEXIST} && opendir($dh, $pd)) {
+               # drop stale symlinks
+               while (defined(my $dn = readdir($dh))) {
+                       if ($dn =~ /\.(?:idx|pack|promisor|bitmap|rev)\z/) {
+                               my $f = "$pd/$dn";
+                               unlink($f) if -l $f && !-e $f;
+                       }
+               }
+               undef $dh;
+       }
        for my $ibx (@{$self->{ibx_list}}) {
+               # create symlinks for multi-pack-index
+               $git_midx += symlink_packs($ibx, $pd);
+               # add new lines to our alternates file
                my $line = $ibx->git->{git_dir} . "/objects\n";
                chomp(my $d = $line);
                if (my @st = stat($d)) {
@@ -1001,11 +1134,18 @@ sub idx_init { # similar to V2Writable
                my $o = \@old;
                PublicInbox::V2Writable::write_alternates($info_dir, $mode, $o);
        }
+       $git_midx and $self->with_umask(sub {
+               my @cmd = ('multi-pack-index');
+               push @cmd, '--no-progress' if ($opt->{quiet}//0) > 1;
+               my $lk = $self->lock_for_scope;
+               system('git', "--git-dir=$ALL", @cmd, 'write');
+               # ignore errors, fairly new command, may not exist
+       });
        $self->parallel_init($self->{indexlevel});
        $self->with_umask(\&_idx_init, $self, $opt);
        $self->{oidx}->begin_lazy;
        $self->{oidx}->eidx_prep;
-       $self->{midx}->begin_txn;
+       $self->{midx}->create_xdb if @new;
 }
 
 sub _watch_commit { # PublicInbox::DS::add_timer callback
@@ -1013,8 +1153,9 @@ sub _watch_commit { # PublicInbox::DS::add_timer callback
        delete $self->{-commit_timer};
        eidxq_process($self, $self->{-watch_sync});
        eidxq_release($self);
-       delete local $self->{-watch_sync}->{-regen_fmt};
+       my $fmt = delete $self->{-watch_sync}->{-regen_fmt};
        reindex_checkpoint($self, $self->{-watch_sync});
+       $self->{-watch_sync}->{-regen_fmt} = $fmt;
 
        # call event_step => done unless commit_timer is armed
        PublicInbox::DS::requeue($self);
@@ -1029,8 +1170,7 @@ sub on_inbox_unlock { # called by PublicInbox::InboxIdle
        $pr->("indexing $ekey\n") if $pr;
        $self->idx_init($opt);
        sync_inbox($self, $self->{-watch_sync}, $ibx);
-       $self->{-commit_timer} //= PublicInbox::DS::add_timer(
-                                       $opt->{'commit-interval'} // 10,
+       $self->{-commit_timer} //= add_timer($opt->{'commit-interval'} // 10,
                                        \&_watch_commit, $self);
 }
 
@@ -1092,10 +1232,13 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
        $pr->("performing initial scan ...\n") if $pr;
        my $sync = eidx_sync($self, $opt); # initial sync
        return if $sync->{quit};
-       my $oldset = PublicInbox::Sigfd::block_signals();
+       my $oldset = PublicInbox::DS::block_signals();
        local $self->{current_info} = '';
        my $cb = $SIG{__WARN__} || \&CORE::warn;
-       local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
+       local $SIG{__WARN__} = sub {
+               return if PublicInbox::Eml::warn_ignore(@_);
+               $cb->($self->{current_info}, ': ', @_);
+       };
        my $sig = {
                HUP => sub { eidx_reload($self, $idler) },
                USR1 => sub { eidx_resync_start($self) },
@@ -1110,7 +1253,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
        if (!$sigfd) {
                # wake up every second to accept signals if we don't
                # have signalfd or IO::KQueue:
-               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS::sig_setmask($oldset);
                PublicInbox::DS->SetLoopTimeout(1000);
        }
        PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
@@ -1130,5 +1273,6 @@ no warnings 'once';
 *atfork_child = \&PublicInbox::V2Writable::atfork_child;
 *idx_shard = \&PublicInbox::V2Writable::idx_shard;
 *reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
+*checkpoint = \&PublicInbox::V2Writable::checkpoint;
 
 1;