$self->{ibx}->with_umask(\&_add, $self, $eml, $check_cb);
}
-sub batch_bytes ($) {
- my ($self) = @_;
- ($self->{parallel} ? $self->{shards} : 1) *
- $PublicInbox::SearchIdx::BATCH_BYTES;
-}
-
# indexes a message, returns true if checkpointing is needed
sub do_idx ($$$$) {
my ($self, $msgref, $mime, $smsg) = @_;
my $idx = idx_shard($self, $smsg->{num} % $self->{shards});
$idx->index_raw($msgref, $mime, $smsg);
my $n = $self->{transact_bytes} += $smsg->{raw_bytes};
- $n >= batch_bytes($self);
+ $n >= $self->{batch_bytes};
}
sub _add {
# xcpdb can change shard count while -watch is idle
my $nshards = count_shards($self);
$self->{shards} = $nshards if $nshards && $nshards != $self->{shards};
+ $self->{batch_bytes} = $opt->{batch_size} //
+ $PublicInbox::SearchIdx::BATCH_BYTES;
+ $self->{batch_bytes} *= $self->{shards} if $self->{parallel};
# need to create all shards before initializing msgmap FD
# idx_shards must be visible to all forked processes
}
# allow -watch or -mda to write...
- $self->idx_init; # reacquire lock
+ $self->idx_init($sync->{-opt}); # reacquire lock
$mm_tmp->atfork_parent if $mm_tmp;
}
$pr->("Xapian indexlevel=$ibx->{indexlevel} ".
"$beg..$end (% $step)\n");
}
- my $batch_bytes = batch_bytes($self);
for (my $num = $beg; $num <= $end; $num += $step) {
my $smsg = $ibx->over->get_art($num) or next;
$smsg->{v2w} = $self;
$ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
- if ($self->{transact_bytes} >= $batch_bytes) {
+ if ($self->{transact_bytes} >= $self->{batch_bytes}) {
${$sync->{nr}} = $num;
reindex_checkpoint($self, $sync);
}
$self->{current_info} = "$i.git $oid";
if ($f eq 'm') {
my $arg = { %$sync, autime => $at, cotime => $ct };
- if ($sync->{index_max_size}) {
+ if ($sync->{max_size}) {
$all->check_async($oid, \&check_size, $arg);
} else {
$all->cat_async($oid, \&index_oid, $arg);
sub xapian_only {
my ($self, $opt, $sync, $art_beg) = @_;
- my $seq = $opt->{sequentialshard};
+ my $seq = $opt->{sequential_shard};
$art_beg //= 0;
local $self->{parallel} = 0 if $seq;
$self->idx_init($opt); # acquire lock
sub index_sync {
my ($self, $opt) = @_;
$opt //= $_[1] //= {};
- goto \&xapian_only if $opt->{xapianonly};
+ goto \&xapian_only if $opt->{xapian_only};
my $pr = $opt->{-progress};
my $epoch_max;
my $latest = git_dir_latest($self, \$epoch_max);
return unless defined $latest;
- my $seq = $opt->{sequentialshard};
+ my $seq = $opt->{sequential_shard};
my $art_beg; # the NNTP article number we start xapian_only at
my $idxlevel = $self->{ibx}->{indexlevel};
local $self->{ibx}->{indexlevel} = 'basic' if $seq;
$art_beg++ if defined($art_beg);
}
}
- if ($sync->{index_max_size} = $self->{ibx}->{index_max_size}) {
+ if ($sync->{max_size} = $opt->{max_size}) {
$sync->{index_oid} = \&index_oid;
}
# work forwards through history