]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/ExtSearchIdx.pm
extsearchidx: simplify reindex code paths
[public-inbox.git] / lib / PublicInbox / ExtSearchIdx.pm
index 3b021a1bc212d8aa25a38be929114d4be25beddf..d529573551c6fd8b8f6e89ae3b901f0880e28186 100644 (file)
@@ -149,7 +149,7 @@ sub index_unseen ($) {
        my $oid = $new_smsg->{blob};
        my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
        $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
-       $idx->index_raw(undef, $eml, $new_smsg, $ibx);
+       $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
        check_batch_limit($req);
 }
 
@@ -395,23 +395,39 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
        done($self);
 }
 
+sub _ibx_for ($$$) {
+       my ($self, $sync, $smsg) = @_;
+       my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
+       my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+       $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
+}
+
 sub _reindex_finalize ($$$) {
        my ($req, $smsg, $eml) = @_;
        my $sync = $req->{sync};
        my $self = $sync->{self};
-       my $by_chash = $req->{by_chash};
+       my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}';
        my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
        my $docid = $smsg->{num} = $orig_smsg->{num};
        $self->{oidx}->add_overview($eml, $smsg); # may rethread
        check_batch_limit({ %$sync, new_smsg => $smsg });
-       if ($nr == 1) { # likely, all good
-               $self->idx_shard($docid)->shard_reindex_docid($docid);
-               return;
+       my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+       my $stable = delete($by_chash->{$chash0}) //
+                               die "BUG: $smsg->{blob} chash missing";
+       my $idx = $self->idx_shard($docid);
+       my $top_smsg = pop @$stable;
+       $top_smsg == $smsg or die 'BUG: top_smsg != smsg';
+       my $ibx = _ibx_for($self, $sync, $smsg);
+       $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+       for my $x (reverse @$stable) {
+               $ibx = _ibx_for($self, $sync, $x);
+               my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
+               $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
        }
+       return if $nr == 1; # likely, all good
+
        warn "W: #$docid split into $nr due to deduplication change\n";
-       my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
-       delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
        my @todo;
        for my $ary (values %$by_chash) {
                for my $x (reverse @$ary) {
@@ -419,21 +435,16 @@ sub _reindex_finalize ($$$) {
                        my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
                        die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
                }
-               my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+               my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
                $x->{num} = delete($x->{xnum}) // die '{xnum} unset';
-               my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
-               my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
-               my $ibx = $self->{ibx_list}->[$pos] //
-                       die "BUG: ibx for $x->{blob} not mapped";
+               $ibx = _ibx_for($self, $sync, $x);
                my $e = $ibx->over->get_art($x->{num});
                $e->{blob} eq $x->{blob} or die <<EOF;
 $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
 EOF
                push @todo, $ibx, $e;
        }
-       $self->{oidx}->commit_lazy; # ensure shard workers can see xref removals
-       $self->{oidx}->begin_lazy;
-       $self->idx_shard($docid)->shard_reindex_docid($docid);
+       undef $by_chash;
        while (my ($ibx, $e) = splice(@todo, 0, 2)) {
                reindex_unseen($self, $sync, $ibx, $e);
        }
@@ -444,14 +455,14 @@ sub _reindex_oid { # git->cat_async callback
        my $sync = $req->{sync};
        my $self = $sync->{self};
        my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
-       my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+       my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2];
        my $docid = $orig_smsg->{num};
        if (is_bad_blob($oid, $type, $size, $expect_oid)) {
                my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
                if ($remain == 0) {
                        warn "W: #$docid gone or corrupted\n";
                        $self->idx_shard($docid)->shard_remove($docid);
-               } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+               } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                        $self->git->cat_async($next_oid, \&_reindex_oid, $req);
                } else {
                        warn "BUG: #$docid gone (UNEXPECTED)\n";
@@ -468,10 +479,11 @@ sub _reindex_oid { # git->cat_async callback
                                cotime => $orig_smsg->{ts} });
        my $chash = content_hash($eml);
        $re_smsg->{chash} = $chash;
-       $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
-       $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+       $re_smsg->{xnum} = $req->{xr3r}->[$req->{ix}]->[1];
+       $re_smsg->{ibx_id} = $req->{xr3r}->[$req->{ix}]->[0];
+       $re_smsg->{hdr} = $eml->header_obj;
        push @{$req->{by_chash}->{$chash}}, $re_smsg;
-       if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+       if (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
                $self->git->cat_async($next_oid, \&_reindex_oid, $req);
        } else { # last $re_smsg is the highest priority xref3
                local $self->{current_info} = "$ci #$docid";
@@ -492,7 +504,7 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
                return;
        }
 
-       # we sort {xr3} in the reverse order of {ibx_list} so we can
+       # we sort {xr3r} in the reverse order of {ibx_list} so we can
        # hit the common case in _reindex_finalize without rereading
        # from git (or holding multiple messages in memory).
        my $id2pos = $sync->{id2pos}; # index in {ibx_list}
@@ -502,15 +514,13 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
                $b->[1] <=> $a->[1] # break ties with {xnum}
        } @$xr3;
        @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
-       my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+       my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 };
        $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
 sub eidxq_process ($$) { # for reindexing
        my ($self, $sync) = @_;
 
-       $self->{oidx}->commit_lazy; # ensure shard workers can see it
-       $self->{oidx}->begin_lazy;
        my $dbh = $self->{oidx}->dbh;
        my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
        ${$sync->{nr}} = 0;