# Copyright (C) 2018-2021 all contributors
# License: AGPL-3.0+
# Internal interface for a single Xapian shard in V2 inboxes.
# See L for more info on how we shard Xapian
package PublicInbox::SearchIdxShard;
use strict;
use v5.10.1;
use parent qw(PublicInbox::SearchIdx PublicInbox::IPC);
use PublicInbox::OnDestroy;
sub new {
my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx
my $ibx = $v2w->{ibx};
my $self = $ibx ? $class->SUPER::new($ibx, 1, $shard)
: $class->eidx_shard_new($v2w, $shard);
# create the DB before forking:
$self->idx_acquire;
$self->set_metadata_once;
$self->idx_release;
if ($v2w->{parallel}) {
local $self->{-v2w_afc} = $v2w;
$self->ipc_worker_spawn("shard[$shard]");
}
$self;
}
sub _worker_done {
my ($self) = @_;
if ($self->need_xapian) {
die "$$ $0 xdb not released\n" if $self->{xdb};
}
die "$$ $0 still in transaction\n" if $self->{txn};
}
sub ipc_atfork_child { # called automatically before ipc_worker_loop
my ($self) = @_;
my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing';
$v2w->atfork_child; # calls shard_atfork_child on our siblings
$v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__}
$self->begin_txn_lazy;
# caller must capture this:
PublicInbox::OnDestroy->new($$, \&_worker_done, $self);
}
sub index_raw {
my ($self, $msgref, $eml, $smsg, $eidx_key) = @_;
if ($eml) {
undef($$msgref) if $msgref;
} else { # --xapian-only + --sequential-shard:
$eml = PublicInbox::Eml->new($msgref);
}
$smsg->{eidx_key} = $eidx_key if defined $eidx_key;
$self->ipc_do('add_message', $eml, $smsg);
}
# needed when there's multiple IPC workers and the parent forking
# causes newer siblings to inherit older siblings sockets
sub shard_atfork_child {
my ($self) = @_;
my $pid = delete($self->{-ipc_worker_pid}) or
die "BUG: $$ no -ipc_worker_pid";
my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock";
$pid == $$ and die "BUG: $$ shard_atfork_child called on itself";
close($s1) or die "close -ipc_sock: $!";
}
# wait for return to determine when ipc_do('commit_txn_lazy') is done
sub echo {
shift;
"@_";
}
sub idx_close {
my ($self) = @_;
die "transaction in progress $self\n" if $self->{txn};
$self->idx_release if $self->{xdb};
}
sub shard_close {
my ($self) = @_;
$self->ipc_do('idx_close');
$self->ipc_worker_stop;
}
sub shard_over_check {
my ($self, $over) = @_;
if ($self->{-ipc_sock} && $over->{dbh}) {
# can't send DB handles over IPC
$over = ref($over)->new($over->{dbh}->sqlite_db_filename);
}
$self->ipc_do('over_check', $over);
}
1;