--reindex can take many hours or days, ensure we release
locks according to --batch-size so automated fetch+index
jobs can write new data to indices while we update old data.
my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
my $docid = $smsg->{num} = $orig_smsg->{num};
$self->{oidx}->add_overview($eml, $smsg); # may rethread
my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
my $docid = $smsg->{num} = $orig_smsg->{num};
$self->{oidx}->add_overview($eml, $smsg); # may rethread
- $self->{transact_bytes} += $smsg->{bytes};
+ check_batch_limit({ %$sync, new_smsg => $smsg });
if ($nr == 1) { # likely, all good
$self->idx_shard($docid)->shard_reindex_docid($docid);
return;
if ($nr == 1) { # likely, all good
$self->idx_shard($docid)->shard_reindex_docid($docid);
return;
warn "E: #$docid does not exist in over\n";
}
$del->execute($docid);
warn "E: #$docid does not exist in over\n";
}
$del->execute($docid);
- my $cur = ++${$sync->{nr}};
-
- # shards flush on their own, just don't queue up too many
- # deletes
- if ($self->{transact_bytes} >= $self->{batch_bytes}) {
- $self->git->async_wait_all;
- $self->{oidx}->commit_lazy;
- $self->{oidx}->begin_lazy;
- $pr->("reindexed $cur/$tot\n") if $pr;
- $self->{transact_bytes} = 0;
- }
# this is only for SIGUSR1, shards do their own accounting:
reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
}
# this is only for SIGUSR1, shards do their own accounting:
reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
}
for my $ibx (@{$self->{ibx_list}}) {
$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
}
for my $ibx (@{$self->{ibx_list}}) {
$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
}
- eidx_reindex($self, $sync) if delete($opt->{reindex});
+ if (delete($opt->{reindex})) {
+ $sync->{checkpoint_unlocks} = 1;
+ eidx_reindex($self, $sync);
+ }
# don't use $_ here, it'll get clobbered by reindex_checkpoint
for my $ibx (@{$self->{ibx_list}}) {
# don't use $_ here, it'll get clobbered by reindex_checkpoint
for my $ibx (@{$self->{ibx_list}}) {
my $mm_tmp = $sync->{mm_tmp};
$mm_tmp->atfork_prepare if $mm_tmp;
die 'BUG: {im} during reindex' if $self->{im};
my $mm_tmp = $sync->{mm_tmp};
$mm_tmp->atfork_prepare if $mm_tmp;
die 'BUG: {im} during reindex' if $self->{im};
- if ($self->{ibx_map}) {
+ if ($self->{ibx_map} && !$sync->{checkpoint_unlocks}) {
checkpoint($self, 1); # no need to release lock on pure index
} else {
$self->done; # release lock
checkpoint($self, 1); # no need to release lock on pure index
} else {
$self->done; # release lock