sub count_shards ($) {
my ($self) = @_;
- my $n = 0;
- my $xpfx = $self->{xpfx};
-
# always load existing shards in case core count changes:
# Also, shard count may change while -watch is running
- # due to "xcpdb --reshard"
- if (-d $xpfx) {
- my $XapianDatabase;
- foreach my $shard (<$xpfx/*>) {
- -d $shard && $shard =~ m!/[0-9]+\z! or next;
- $XapianDatabase //= do {
- require PublicInbox::Search;
- PublicInbox::Search::load_xapian();
- $PublicInbox::Search::X{Database};
- };
- eval {
- $XapianDatabase->new($shard)->close;
- $n++;
- };
- }
- }
- $n;
+ my $srch = $self->{ibx}->search or return 0;
+ delete $self->{ibx}->{search};
+ $srch->{nshard} // 0
}
sub new {
total_bytes => 0,
current_info => '',
xpfx => $xpfx,
- over => PublicInbox::OverIdx->new("$xpfx/over.sqlite3"),
+ oidx => PublicInbox::OverIdx->new("$xpfx/over.sqlite3"),
lock_path => "$dir/inbox.lock",
# limit each git repo (epoch) to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
last_commit => [], # git epoch -> commit
};
- $self->{over}->{-no_fsync} = 1 if $v2ibx->{-no_fsync};
+ $self->{oidx}->{-no_fsync} = 1 if $v2ibx->{-no_fsync};
$self->{shards} = count_shards($self) || nproc_shards($creat);
bless $self, $class;
}
sub do_idx ($$$$) {
my ($self, $msgref, $mime, $smsg) = @_;
$smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref);
- $self->{over}->add_overview($mime, $smsg);
+ $self->{oidx}->add_overview($mime, $smsg);
my $idx = idx_shard($self, $smsg->{num} % $self->{shards});
$idx->index_raw($msgref, $mime, $smsg);
my $n = $self->{transact_bytes} += $smsg->{raw_bytes};
if ($altid && grep(/:file=msgmap\.sqlite3\z/, @$altid)) {
my $num = $self->{mm}->num_for($mid);
- if (defined $num && !$self->{over}->get_art($num)) {
+ if (defined $num && !$self->{oidx}->get_art($num)) {
return ($num, $mid);
}
}
sub _idx_init { # with_umask callback
my ($self, $opt) = @_;
$self->lock_acquire unless $opt && $opt->{-skip_lock};
- $self->{over}->create;
+ $self->{oidx}->create;
# xcpdb can change shard count while -watch is idle
my $nshards = count_shards($self);
} else {
$im = $self->importer;
}
- my $over = $self->{over};
+ my $oidx = $self->{oidx};
my $chashes = content_hashes($old_eml);
my $removed = [];
my $mids = mids($old_eml);
foreach my $mid (@$mids) {
my %gone; # num => [ smsg, $mime, raw ]
my ($id, $prev);
- while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
+ while (my $smsg = $oidx->next_by_mid($mid, \$id, \$prev)) {
my $msg = get_blob($self, $smsg);
if (!defined($msg)) {
warn "broken smsg for $mid\n";
$dbh->commit;
# SQLite overview is third
- $self->{over}->commit_lazy;
+ $self->{oidx}->commit_lazy;
# Now deal with Xapian
if ($wait) {
# public
sub barrier { checkpoint($_[0], 1) };
+# true if locked and active
+sub active { !!$_[0]->{im} }
+
# public
sub done {
my ($self) = @_;
$err .= "shard close: $@\n" if $@;
}
}
- eval { $self->{over}->disconnect };
- $err .= "over disconnect: $@\n" if $@;
+ eval { $self->{oidx}->dbh_close };
+ $err .= "over close: $@\n" if $@;
delete $self->{bnote};
my $nbytes = $self->{total_bytes};
$self->{total_bytes} = 0;
sub content_exists ($$$) {
my ($self, $mime, $mid) = @_;
- my $over = $self->{over};
+ my $oidx = $self->{oidx};
my $chashes = content_hashes($mime);
my ($id, $prev);
- while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
+ while (my $smsg = $oidx->next_by_mid($mid, \$id, \$prev)) {
my $msg = get_blob($self, $smsg);
if (!defined($msg)) {
warn "broken smsg for $mid\n";
}
}
if (!defined($num)) { # reuse if reindexing (or duplicates)
- my $over = $self->{over};
+ my $oidx = $self->{oidx};
for my $mid (@$mids) {
- ($num, $mid0) = $over->num_mid0_for_oid($oid, $mid);
+ ($num, $mid0) = $oidx->num_mid0_for_oid($oid, $mid);
last if defined $num;
}
}
sub unindex_oid_remote ($$$) {
my ($self, $oid, $mid) = @_;
- my @removed = $self->{over}->remove_oid($oid, $mid);
+ my @removed = $self->{oidx}->remove_oid($oid, $mid);
for my $num (@removed) {
my $idx = idx_shard($self, $num % $self->{shards});
$idx->shard_remove($oid, $num);
my $mm = $self->{mm};
my $mids = mids(PublicInbox::Eml->new($bref));
undef $$bref;
- my $over = $self->{over};
+ my $oidx = $self->{oidx};
foreach my $mid (@$mids) {
my %gone;
my ($id, $prev);
- while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
+ while (my $smsg = $oidx->next_by_mid($mid, \$id, \$prev)) {
$gone{$smsg->{num}} = 1 if $oid eq $smsg->{blob};
}
my $n = scalar(keys(%gone)) or next;
$self->idx_init($opt); # acquire lock
fill_alternates($self, $epoch_max);
- $self->{over}->rethread_prepare($opt);
+ $self->{oidx}->rethread_prepare($opt);
my $sync = {
need_checkpoint => \(my $bool = 0),
unindex_range => {}, # EPOCH => oid_old..oid_new
}
# work forwards through history
index_epoch($self, $sync, $_) for (0..$epoch_max);
- $self->{over}->rethread_done($opt);
+ $self->{oidx}->rethread_done($opt);
$self->done;
if (my $nr = $sync->{nr}) {