my $oid = $new_smsg->{blob};
my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
$self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
- $idx->index_raw(undef, $eml, $new_smsg, $ibx);
+ $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
check_batch_limit($req);
}
done($self);
}
+sub _ibx_for ($$$) {
+ my ($self, $sync, $smsg) = @_;
+ my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
+ my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+ $self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
+}
+
sub _reindex_finalize ($$$) {
my ($req, $smsg, $eml) = @_;
my $sync = $req->{sync};
my $self = $sync->{self};
- my $by_chash = $req->{by_chash};
+ my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}';
my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
my $docid = $smsg->{num} = $orig_smsg->{num};
$self->{oidx}->add_overview($eml, $smsg); # may rethread
check_batch_limit({ %$sync, new_smsg => $smsg });
- if ($nr == 1) { # likely, all good
- $self->idx_shard($docid)->shard_reindex_docid($docid);
- return;
+ my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+ my $stable = delete($by_chash->{$chash0}) //
+ die "BUG: $smsg->{blob} chash missing";
+ my $idx = $self->idx_shard($docid);
+ my $top_smsg = pop @$stable;
+ $top_smsg == $smsg or die 'BUG: top_smsg != smsg';
+ my $ibx = _ibx_for($self, $sync, $smsg);
+ $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+ for my $x (reverse @$stable) {
+ $ibx = _ibx_for($self, $sync, $x);
+ my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
+ $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
}
+ return if $nr == 1; # likely, all good
+
warn "W: #$docid split into $nr due to deduplication change\n";
- my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
- delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
my @todo;
for my $ary (values %$by_chash) {
for my $x (reverse @$ary) {
my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
}
- my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+ my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
$x->{num} = delete($x->{xnum}) // die '{xnum} unset';
- my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
- my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
- my $ibx = $self->{ibx_list}->[$pos] //
- die "BUG: ibx for $x->{blob} not mapped";
+ $ibx = _ibx_for($self, $sync, $x);
my $e = $ibx->over->get_art($x->{num});
$e->{blob} eq $x->{blob} or die <<EOF;
$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
EOF
push @todo, $ibx, $e;
}
- $self->{oidx}->commit_lazy; # ensure shard workers can see xref removals
- $self->{oidx}->begin_lazy;
- $self->idx_shard($docid)->shard_reindex_docid($docid);
+ undef $by_chash;
while (my ($ibx, $e) = splice(@todo, 0, 2)) {
reindex_unseen($self, $sync, $ibx, $e);
}
my $sync = $req->{sync};
my $self = $sync->{self};
my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
- my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+ my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2];
my $docid = $orig_smsg->{num};
if (is_bad_blob($oid, $type, $size, $expect_oid)) {
my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
if ($remain == 0) {
warn "W: #$docid gone or corrupted\n";
$self->idx_shard($docid)->shard_remove($docid);
- } elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+ } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
$self->git->cat_async($next_oid, \&_reindex_oid, $req);
} else {
warn "BUG: #$docid gone (UNEXPECTED)\n";
cotime => $orig_smsg->{ts} });
my $chash = content_hash($eml);
$re_smsg->{chash} = $chash;
- $re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
- $re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+ $re_smsg->{xnum} = $req->{xr3r}->[$req->{ix}]->[1];
+ $re_smsg->{ibx_id} = $req->{xr3r}->[$req->{ix}]->[0];
+ $re_smsg->{hdr} = $eml->header_obj;
push @{$req->{by_chash}->{$chash}}, $re_smsg;
- if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+ if (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
$self->git->cat_async($next_oid, \&_reindex_oid, $req);
} else { # last $re_smsg is the highest priority xref3
local $self->{current_info} = "$ci #$docid";
return;
}
- # we sort {xr3} in the reverse order of {ibx_list} so we can
+ # we sort {xr3r} in the reverse order of {ibx_list} so we can
# hit the common case in _reindex_finalize without rereading
# from git (or holding multiple messages in memory).
my $id2pos = $sync->{id2pos}; # index in {ibx_list}
$b->[1] <=> $a->[1] # break ties with {xnum}
} @$xr3;
@$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
- my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+ my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 };
$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
}
sub eidxq_process ($$) { # for reindexing
my ($self, $sync) = @_;
- $self->{oidx}->commit_lazy; # ensure shard workers can see it
- $self->{oidx}->begin_lazy;
my $dbh = $self->{oidx}->dbh;
my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
${$sync->{nr}} = 0;