my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
my $docid = $smsg->{num} = $orig_smsg->{num};
$self->{oidx}->add_overview($eml, $smsg); # may rethread
- return if $nr == 1; # likely, all good
-
+ $self->{transact_bytes} += $smsg->{bytes};
+ if ($nr == 1) { # likely, all good
+ $self->idx_shard($docid)->shard_reindex_docid($docid);
+ return;
+ }
warn "W: #$docid split into $nr due to deduplication change\n";
my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
+ my @todo;
for my $ary (values %$by_chash) {
for my $x (reverse @$ary) {
+ warn "removing #$docid xref3 $x->{blob}\n";
my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
}
$e->{blob} eq $x->{blob} or die <<EOF;
$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
EOF
+ push @todo, $ibx, $e;
+ }
+ $self->{oidx}->commit_lazy; # ensure shard workers can see xref removals
+ $self->{oidx}->begin_lazy;
+ $self->idx_shard($docid)->shard_reindex_docid($docid);
+ while (my ($ibx, $e) = splice(@todo, 0, 2)) {
reindex_unseen($self, $sync, $ibx, $e);
}
}
# shards flush on their own, just don't queue up too many
# deletes
- if (($cur % 1000) == 0) {
+ if ($self->{transact_bytes} >= $self->{batch_bytes}) {
$self->git->async_wait_all;
$self->{oidx}->commit_lazy;
$self->{oidx}->begin_lazy;
$pr->("reindexed $cur/$tot\n") if $pr;
+ $self->{transact_bytes} = 0;
}
# this is only for SIGUSR1, shards do their own accounting:
reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};