use v5.10.1;
use parent qw(PublicInbox::Lock);
use PublicInbox::SearchIdxShard;
-use PublicInbox::IdxStack;
use PublicInbox::Eml;
use PublicInbox::Git;
use PublicInbox::Import;
$self->{ibx}->with_umask(\&_add, $self, $eml, $check_cb);
}
-sub batch_bytes ($) {
- my ($self) = @_;
- ($self->{parallel} ? $self->{shards} : 1) *
- $PublicInbox::SearchIdx::BATCH_BYTES;
-}
-
# indexes a message, returns true if checkpointing is needed
sub do_idx ($$$$) {
my ($self, $msgref, $mime, $smsg) = @_;
my $idx = idx_shard($self, $smsg->{num} % $self->{shards});
$idx->index_raw($msgref, $mime, $smsg);
my $n = $self->{transact_bytes} += $smsg->{raw_bytes};
- $n >= batch_bytes($self);
+ $n >= $self->{batch_bytes};
}
sub _add {
# xcpdb can change shard count while -watch is idle
my $nshards = count_shards($self);
$self->{shards} = $nshards if $nshards && $nshards != $self->{shards};
+ $self->{batch_bytes} = $opt->{batch_size} //
+ $PublicInbox::SearchIdx::BATCH_BYTES;
+ $self->{batch_bytes} *= $self->{shards} if $self->{parallel};
# need to create all shards before initializing msgmap FD
# idx_shards must be visible to all forked processes
my $barrier = $self->barrier_init(scalar @$shards);
# each shard needs to issue a barrier command
- $_->remote_barrier for @$shards;
+ $_->shard_barrier for @$shards;
# wait for each Xapian shard
$self->barrier_wait($barrier);
} else {
- $_->remote_commit for @$shards;
+ $_->shard_commit for @$shards;
}
# last_commit is special, don't commit these until
my $shards = delete $self->{idx_shards};
if ($shards) {
for (@$shards) {
- eval { $_->remote_close };
+ eval { $_->shard_close };
$err .= "shard close: $@\n" if $@;
}
}
- eval { $self->{over}->disconnect };
- $err .= "over disconnect: $@\n" if $@;
+ eval { $self->{over}->dbh_close };
+ $err .= "over close: $@\n" if $@;
delete $self->{bnote};
my $nbytes = $self->{total_bytes};
$self->{total_bytes} = 0;
}
# allow -watch or -mda to write...
- $self->idx_init; # reacquire lock
+ $self->idx_init($sync->{-opt}); # reacquire lock
$mm_tmp->atfork_parent if $mm_tmp;
}
my $opt = $sync->{-opt};
my $pr = $opt->{-progress} if (($opt->{verbose} || 0) > 1);
my $cur = $sync->{ranges}->[$i] or do {
- $pr->("$i.git indexing all of $tip") if $pr;
+ $pr->("$i.git indexing all of $tip\n") if $pr;
return $tip; # all of it
};
my @removed = $self->{over}->remove_oid($oid, $mid);
for my $num (@removed) {
my $idx = idx_shard($self, $num % $self->{shards});
- $idx->remote_remove($oid, $num);
+ $idx->shard_remove($oid, $num);
}
}
$pr->("Xapian indexlevel=$ibx->{indexlevel} ".
"$beg..$end (% $step)\n");
}
- my $batch_bytes = batch_bytes($self);
for (my $num = $beg; $num <= $end; $num += $step) {
my $smsg = $ibx->over->get_art($num) or next;
$smsg->{v2w} = $self;
$ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
- if ($self->{transact_bytes} >= $batch_bytes) {
+ if ($self->{transact_bytes} >= $self->{batch_bytes}) {
${$sync->{nr}} = $num;
reindex_checkpoint($self, $sync);
}
$self->{current_info} = "$i.git $oid";
if ($f eq 'm') {
my $arg = { %$sync, autime => $at, cotime => $ct };
- if ($sync->{index_max_size}) {
+ if ($sync->{max_size}) {
$all->check_async($oid, \&check_size, $arg);
} else {
$all->cat_async($oid, \&index_oid, $arg);
sub xapian_only {
my ($self, $opt, $sync, $art_beg) = @_;
- my $seq = $opt->{sequentialshard};
+ my $seq = $opt->{sequential_shard};
$art_beg //= 0;
local $self->{parallel} = 0 if $seq;
$self->idx_init($opt); # acquire lock
$sync->{art_end} = $art_end;
if ($seq || !$self->{parallel}) {
my $shard_end = $self->{shards} - 1;
- for (0..$shard_end) {
- index_xap_step($self, $sync, $art_beg + $_)
+ for my $i (0..$shard_end) {
+ index_xap_step($self, $sync, $art_beg + $i);
+ if ($i != $shard_end) {
+ reindex_checkpoint($self, $sync);
+ }
}
} else { # parallel (maybe)
index_xap_step($self, $sync, $art_beg, 1);
sub index_sync {
my ($self, $opt) = @_;
$opt //= $_[1] //= {};
- goto \&xapian_only if $opt->{xapianonly};
+ goto \&xapian_only if $opt->{xapian_only};
my $pr = $opt->{-progress};
my $epoch_max;
my $latest = git_dir_latest($self, \$epoch_max);
return unless defined $latest;
- my $seq = $opt->{sequentialshard};
+ my $seq = $opt->{sequential_shard};
my $art_beg; # the NNTP article number we start xapian_only at
my $idxlevel = $self->{ibx}->{indexlevel};
local $self->{ibx}->{indexlevel} = 'basic' if $seq;
$art_beg++ if defined($art_beg);
}
}
- if ($sync->{index_max_size} = $self->{ibx}->{index_max_size}) {
+ if ($sync->{max_size} = $opt->{max_size}) {
$sync->{index_oid} = \&index_oid;
}
# work forwards through history
xapian_only($self, $opt, $sync, $art_beg);
}
+ # --reindex on the command-line
+ if ($opt->{reindex} && !ref($opt->{reindex}) && $idxlevel ne 'basic') {
+ $self->lock_acquire;
+ my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0);
+ if (my $xdb = $s0->idx_acquire) {
+ my $n = $xdb->get_metadata('has_threadid');
+ $xdb->set_metadata('has_threadid', '1') if $n ne '1';
+ }
+ $s0->idx_release;
+ $self->lock_release;
+ }
+
# reindex does not pick up new changes, so we rerun w/o it:
if ($opt->{reindex}) {
my %again = %$opt;