]> Sergey Matveev's repositories - public-inbox.git/commitdiff
extsearchidx: simplify reindex code paths
authorEric Wong <e@80x24.org>
Tue, 15 Dec 2020 02:02:22 +0000 (02:02 +0000)
committerEric Wong <e@80x24.org>
Thu, 17 Dec 2020 19:13:14 +0000 (19:13 +0000)
Since we're inside a Xapian transaction, calling ->index_raw
followed by ->shard_add_eidx_info calls on the same docid
doesn't seem to hurt indexing performance.  It definitely
reduces FS read traffic and IPC from git at the cost of some
more IPC between the parent and workers.  Nevertheless, the code
and FD reductions seem worth it.

lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/SearchIdxShard.pm

index 3b021a1bc212d8aa25a38be929114d4be25beddf..d529573551c6fd8b8f6e89ae3b901f0880e28186 100644 (file)
@@ -149,7 +149,7 @@ sub index_unseen ($) {
        my $oid = $new_smsg->{blob};
        my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
        $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
-       $idx->index_raw(undef, $eml, $new_smsg, $ibx);
+       $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
        check_batch_limit($req);
 }
 
@@ -395,23 +395,39 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
        done($self);
 }
 
+sub _ibx_for ($$$) {
+       my ($self, $sync, $smsg) = @_;
+       my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
+       my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+       $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
+}
+
 sub _reindex_finalize ($$$) {
        my ($req, $smsg, $eml) = @_;
        my $sync = $req->{sync};
        my $self = $sync->{self};
-       my $by_chash = $req->{by_chash};
+       my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}';
        my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
        my $docid = $smsg->{num} = $orig_smsg->{num};
        $self->{oidx}->add_overview($eml, $smsg); # may rethread
        check_batch_limit({ %$sync, new_smsg => $smsg });
-       if ($nr == 1) { # likely, all good
-               $self->idx_shard($docid)->shard_reindex_docid($docid);
-               return;
+       my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+       my $stable = delete($by_chash->{$chash0}) //
+                               die "BUG: $smsg->{blob} chash missing";
+       my $idx = $self->idx_shard($docid);
+       my $top_smsg = pop @$stable;
+       $top_smsg == $smsg or die 'BUG: top_smsg != smsg';
+       my $ibx = _ibx_for($self, $sync, $smsg);
+       $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+       for my $x (reverse @$stable) {
+               $ibx = _ibx_for($self, $sync, $x);
+               my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
+               $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
        }
+       return if $nr == 1; # likely, all good
+
        warn "W: #$docid split into $nr due to deduplication change\n";
-       my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
-       delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
        my @todo;
        for my $ary (values %$by_chash) {
                for my $x (reverse @$ary) {
@@ -419,21 +435,16 @@ sub _reindex_finalize ($$$) {
                        my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
                        die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
                }
-               my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+               my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
                $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
-               my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
-               my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
-               my $ibx = $self->{ibx_list}->[$pos] //
-                       die "BUG: ibx for $x->{blob} not mapped";
+               $ibx = _ibx_for($self, $sync, $x);
                my $e = $ibx->over->get_art($x->{num});
                $e->{blob} eq $x->{blob} or die <<EOF;
 $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
 EOF
                push @todo, $ibx, $e;
        }
-       $self->{oidx}->commit_lazy; # ensure shard workers can see xref removals
-       $self->{oidx}->begin_lazy;
-       $self->idx_shard($docid)->shard_reindex_docid($docid);
+       undef $by_chash;
        while (my ($ibx, $e) = splice(@todo, 0, 2)) {
                reindex_unseen($self, $sync, $ibx, $e);
        }
@@ -444,14 +455,14 @@ sub _reindex_oid { # git->cat_async callback
        my $sync = $req->{sync};
        my $self = $sync->{self};
        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
-       my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+       my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2];
        my $docid = $orig_smsg->{num};
        if (is_bad_blob($oid, $type, $size, $expect_oid)) {
                my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
                if ($remain == 0) {
                        warn "W: #$docid gone or corrupted\n";
                        $self->idx_shard($docid)->shard_remove($docid);
-               } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+               } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                        $self->git->cat_async($next_oid, \&_reindex_oid, $req);
                } else {
                        warn "BUG: #$docid gone (UNEXPECTED)\n";
@@ -468,10 +479,11 @@ sub _reindex_oid { # git->cat_async callback
                                cotime => $orig_smsg->{ts} });
        my $chash = content_hash($eml);
        $re_smsg->{chash} = $chash;
-       $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
-       $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+       $re_smsg->{xnum} = $req->{xr3r}->[$req->{ix}]->[1];
+       $re_smsg->{ibx_id} = $req->{xr3r}->[$req->{ix}]->[0];
+       $re_smsg->{hdr} = $eml->header_obj;
        push @{$req->{by_chash}->{$chash}}, $re_smsg;
-       if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+       if (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                $self->git->cat_async($next_oid, \&_reindex_oid, $req);
        } else { # last $re_smsg is the highest priority xref3
                local $self->{current_info} = "$ci #$docid";
@@ -492,7 +504,7 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
                return;
        }
 
-       # we sort {xr3} in the reverse order of {ibx_list} so we can
+       # we sort {xr3r} in the reverse order of {ibx_list} so we can
        # hit the common case in _reindex_finalize without rereading
        # from git (or holding multiple messages in memory).
        my $id2pos = $sync->{id2pos}; # index in {ibx_list}
@@ -502,15 +514,13 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
                $b->[1] <=> $a->[1] # break ties with {xnum}
        } @$xr3;
        @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
-       my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+       my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 };
        $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
 sub eidxq_process ($$) { # for reindexing
        my ($self, $sync) = @_;
 
-       $self->{oidx}->commit_lazy; # ensure shard workers can see it
-       $self->{oidx}->begin_lazy;
        my $dbh = $self->{oidx}->dbh;
        my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
        ${$sync->{nr}} = 0;
index cd8f4dd763b7bc20e1ee4ca0abe11ba1386e475d..c6d2a0e88ba61dc9f7310aa40f2aab92966e621e 100644 (file)
@@ -1008,68 +1008,4 @@ SELECT COUNT(*) FROM over WHERE num = ?
        }
 }
 
-sub reindex_xap { # git->cat_async callback
-       my ($bref, $oid, $type, $size, $ary) = @_;
-       my ($ibx_id, $oidhex, $req, $more) = @$ary;
-       my $self = $req->{self} // die 'BUG: {self} missing';
-       my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
-       my $eidx_key = $self->{-eidx_key_for}->{$ibx_id} //
-                       die "BUG: bad ibx_id=$ibx_id ($oid)";
-
-       my $docid = $req->{docid};
-       local $eidx->{current_info} = "#$docid $oid";
-       return if is_bad_blob($oid, $type, $size, $oidhex);
-       if (my $doc = $req->{doc}) { # modify existing doc
-               $req->{tg_isset} //= do { # for existing documents in {xdb}
-                       term_generator($self)->set_document($doc);
-                       1;
-               };
-               $doc->add_boolean_term('O'.$eidx_key);
-               index_list_id($self, $doc, PublicInbox::Eml->new($bref));
-       } else { # first time seeing this doc
-               my $smsg = $self->{eidx}->over->get_art($docid) //
-                       die "BUG: #$docid ($oid) not in over";
-               $smsg->{bytes} = $size + crlf_adjust($$bref);
-               $smsg->{eidx_key} = $eidx_key;
-               my $eml = PublicInbox::Eml->new($bref);
-               $req->{doc} = eml2doc($self, $eml, $smsg);
-               $req->{tg_isset} = 1; # eml2doc calls $tg->set_document
-       }
-       return if $more;
-       my $doc = delete($req->{doc}) or return; # all bad blobs!
-       $eidx->{transact_bytes} += $size;
-       $self->{xdb}->replace_document($req->{docid}, $doc);
-}
-
-sub reindex_docid {
-       my ($self, $docid) = @_;
-       my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
-       my $eidx_key_for = $self->{-eidx_key_for} //= do {
-               my %eidx_key_for = map {
-                       $_->[0] => $_->[1];
-               } @{$eidx->over->dbh->selectall_arrayref(<<'')};
-SELECT ibx_id,eidx_key FROM inboxes
-
-               \%eidx_key_for;
-       };
-
-       begin_txn_lazy($self);
-       my $doc = eval { $self->{xdb}->get_document($docid) };
-       my $req = { doc => $doc, self => $self, docid => $docid };
-       my $sth = $eidx->over->dbh->prepare_cached(<<'', undef, 1);
-SELECT ibx_id,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC
-
-       $sth->execute($docid);
-       my $rows = $sth->fetchall_arrayref;
-       while (my $row = shift(@$rows)) {
-               my ($ibx_id, $oidbin) = @$row;
-               my $oidhex = unpack('H*', $oidbin);
-               $eidx->git->cat_async($oidhex, \&reindex_xap,
-                               [ $ibx_id, $oidhex, $req, scalar(@$rows) ]);
-       }
-       if ($eidx->{transact_bytes} >= $eidx->{batch_bytes}) {
-               commit_txn_lazy($self);
-       }
-}
-
 1;
index b6eef6bdd4b80bc6600af0d100ba5b0260a66d55..ee00858b0a32d3cc0fa50cc95abd70120231586c 100644 (file)
@@ -94,8 +94,6 @@ sub shard_worker_loop ($$$$$) {
                        my $over_fn = $1;
                        $over_fn =~ tr/\0/\n/;
                        $self->over_check(PublicInbox::Over->new($over_fn));
-               } elsif ($line =~ /\AE ([0-9]+)\n/) {
-                       $self->reindex_docid($1 + 0);
                } else {
                        chomp $line;
                        my $eidx_key;
@@ -124,9 +122,9 @@ sub shard_worker_loop ($$$$$) {
 }
 
 sub index_raw {
-       my ($self, $msgref, $eml, $smsg, $ibx) = @_;
+       my ($self, $msgref, $eml, $smsg, $eidx_key) = @_;
        if (my $w = $self->{w}) {
-               my @ekey = $ibx ? ('X='.$ibx->eidx_key."\0") : ();
+               my @ekey = defined($eidx_key) ? ("X=$eidx_key\0") : ();
                $msgref //= \($eml->as_string);
                $smsg->{raw_bytes} //= length($$msgref);
                # mid must be last, it can contain spaces (but not LF)
@@ -140,7 +138,7 @@ sub index_raw {
                        $eml = PublicInbox::Eml->new($msgref);
                }
                $self->begin_txn_lazy;
-               $smsg->{eidx_key} = $ibx->eidx_key if $ibx;
+               $smsg->{eidx_key} = $eidx_key if defined $eidx_key;
                $self->add_message($eml, $smsg);
        }
 }
@@ -225,13 +223,4 @@ sub shard_over_check {
        }
 }
 
-sub shard_reindex_docid {
-       my ($self, $docid) = @_;
-       if (my $w = $self->{w}) {
-               print $w "E $docid\n" or die "failed to write to shard: $!";
-       } else {
-               $self->reindex_docid($docid);
-       }
-}
-
 1;