}, __PACKAGE__;
$self->{shards} = $self->count_shards || nproc_shards($opt->{creat});
my $oidx = PublicInbox::OverIdx->new("$self->{xpfx}/over.sqlite3");
- $oidx->{-no_fsync} = 1 if $opt->{-no_fsync};
+ $self->{-no_fsync} = $oidx->{-no_fsync} = 1 if !$opt->{fsync};
$self->{oidx} = $oidx;
$self
}
sub attach_inbox {
my ($self, $ibx) = @_;
- my $key = $ibx->eidx_key;
- if (!$ibx->over || !$ibx->mm) {
- warn "W: skipping $key (unindexed)\n";
- return;
- }
- if (!defined($ibx->uidvalidity)) {
- warn "W: skipping $key (no UIDVALIDITY)\n";
- return;
- }
- $self->{ibx_map}->{$key} //= do {
+ $self->{ibx_map}->{$ibx->eidx_key} //= do {
push @{$self->{ibx_list}}, $ibx;
$ibx;
}
$heads;
}
+sub _ibx_index_reject ($) {
+ my ($ibx) = @_;
+ $ibx->mm // return 'unindexed, no msgmap.sqlite3';
+ $ibx->uidvalidity // return 'no UIDVALIDITY';
+ $ibx->over // return 'unindexed, no over.sqlite3';
+ undef;
+}
+
sub _sync_inbox ($$$) {
my ($self, $sync, $ibx) = @_;
+ my $ekey = $ibx->eidx_key;
+ if (defined(my $err = _ibx_index_reject($ibx))) {
+ return "W: skipping $ekey ($err)";
+ }
$sync->{ibx} = $ibx;
$sync->{nr} = \(my $nr = 0);
my $v = $ibx->version;
- my $ekey = $ibx->eidx_key;
if ($v == 2) {
$sync->{epoch_max} = $ibx->max_git_epoch // return;
sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable
} elsif ($v == 1) {
my $uv = $ibx->uidvalidity;
my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv");
- my $head = $ibx->mm->last_commit;
- unless (defined $head) {
- warn "E: $ibx->{inboxdir} is not indexed\n";
- return;
- }
+ my $head = $ibx->mm->last_commit //
+ return "E: $ibx->{inboxdir} is not indexed";
my $stk = prepare_stack($sync, $lc ? "$lc..$head" : $head);
my $unit = { stack => $stk, git => $ibx->git };
push @{$sync->{todo}}, $unit;
} else {
- warn "E: $ekey unsupported inbox version (v$v)\n";
- return;
+ return "E: $ekey unsupported inbox version (v$v)";
}
for my $unit (@{delete($sync->{todo}) // []}) {
last if $sync->{quit};
}
$self->{midx}->index_ibx($ibx) unless $sync->{quit};
$ibx->git->cleanup; # done with this inbox, now
+ undef;
}
sub gc_unref_doc ($$$$) {
$self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
}
+sub _fd_constrained ($) {
+ my ($self) = @_;
+ $self->{-fd_constrained} //= do {
+ my $soft;
+ if (eval { require BSD::Resource; 1 }) {
+ my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
+ ($soft, undef) = BSD::Resource::getrlimit($NOFILE);
+ } else {
+ chomp($soft = `sh -c 'ulimit -n'`);
+ }
+ if (defined($soft)) {
+ my $want = scalar(@{$self->{ibx_list}}) + 64; # estimate
+ my $ret = $want > $soft;
+ if ($ret) {
+ warn <<EOF;
+RLIMIT_NOFILE=$soft insufficient (want: $want), will close DB handles early
+EOF
+ }
+ $ret;
+ } else {
+ warn "Unable to determine RLIMIT_NOFILE: $@\n";
+ 1;
+ }
+ };
+}
+
sub _reindex_finalize ($$$) {
my ($req, $smsg, $eml) = @_;
my $sync = $req->{sync};
my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
$x->{num} = delete($x->{xnum}) // die '{xnum} unset';
$ibx = _ibx_for($self, $sync, $x);
- my $e = $ibx->over->get_art($x->{num});
- $e->{blob} eq $x->{blob} or die <<EOF;
+ if (my $over = $ibx->over) {
+ my $e = $over->get_art($x->{num});
+ $e->{blob} eq $x->{blob} or die <<EOF;
$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
EOF
- push @todo, $ibx, $e;
+ push @todo, $ibx, $e;
+ $over->dbh_close if _fd_constrained($self);
+ } else {
+ die "$ibx->{inboxdir}: over.sqlite3 unusable: $!\n";
+ }
}
undef $by_chash;
while (my ($ibx, $e) = splice(@todo, 0, 2)) {
sub _reindex_inbox ($$$) {
my ($self, $sync, $ibx) = @_;
- local $self->{current_info} = $ibx->eidx_key;
- _reindex_check_unseen($self, $sync, $ibx);
- _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+ my $ekey = $ibx->eidx_key;
+ local $self->{current_info} = $ekey;
+ if (defined(my $err = _ibx_index_reject($ibx))) {
+ warn "W: cannot reindex $ekey ($err)\n";
+ } else {
+ _reindex_check_unseen($self, $sync, $ibx);
+ _reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+ }
delete @$ibx{qw(over mm search git)}; # won't need these for a bit
}
# don't use $_ here, it'll get clobbered by reindex_checkpoint
for my $ibx (@{$self->{ibx_list}}) {
last if $sync->{quit};
- _sync_inbox($self, $sync, $ibx);
+ my $err = _sync_inbox($self, $sync, $ibx);
+ delete @$ibx{qw(mm over)};
+ warn $err, "\n" if defined($err);
}
$self->{oidx}->rethread_done($opt) unless $sync->{quit};
eidxq_process($self, $sync) unless $sync->{quit};
PublicInbox::V2Writable::write_alternates($info_dir, $mode, $o);
}
$self->parallel_init($self->{indexlevel});
- $self->umask_prepare;
$self->with_umask(\&_idx_init, $self, $opt);
$self->{oidx}->begin_lazy;
$self->{oidx}->eidx_prep;
no warnings 'once';
*done = \&PublicInbox::V2Writable::done;
-*umask_prepare = \&PublicInbox::InboxWritable::umask_prepare;
*with_umask = \&PublicInbox::InboxWritable::with_umask;
*parallel_init = \&PublicInbox::V2Writable::parallel_init;
*nproc_shards = \&PublicInbox::V2Writable::nproc_shards;