if ($v2w->{parallel}) {
local $self->{-v2w_afc} = $v2w;
$self->ipc_worker_spawn("shard[$shard]");
+ # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size for
+ # inputs speeds V2Writable batch imports across 8 cores by
+ # nearly 20%. Since any of our responses are small, make
+ # the response pipe as small as possible
+ if ($^O eq 'linux') {
+ fcntl($self->{-ipc_req}, 1031, 1048576);
+ fcntl($self->{-ipc_res}, 1031, 4096);
+ }
}
$self;
}
sub ipc_atfork_child { # called automatically before ipc_worker_loop
my ($self) = @_;
my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing';
- $v2w->atfork_child; # calls shard_atfork_child on our siblings
+ $v2w->atfork_child; # calls ipc_sibling_atfork_child on our siblings
$v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__}
$self->begin_txn_lazy;
# caller must capture this:
PublicInbox::OnDestroy->new($$, \&_worker_done, $self);
}
-sub index_raw {
- my ($self, $msgref, $eml, $smsg, $eidx_key) = @_;
- if ($eml) {
- undef($$msgref) if $msgref;
- } else { # --xapian-only + --sequential-shard:
- $eml = PublicInbox::Eml->new($msgref);
- }
+sub index_eml {
+ my ($self, $eml, $smsg, $eidx_key) = @_;
$smsg->{eidx_key} = $eidx_key if defined $eidx_key;
- $self->ipc_do('add_message', $eml, $smsg);
-}
-
-sub shard_add_eidx_info {
- my ($self, $docid, $eidx_key, $eml) = @_;
- $self->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
-}
-
-sub shard_remove_eidx_info {
- my ($self, $docid, $eidx_key, $eml) = @_;
- $self->ipc_do('remove_eidx_info', $docid, $eidx_key, $eml);
-}
-
-# needed when there's multiple IPC workers and the parent forking
-# causes newer siblings to inherit older siblings sockets
-sub shard_atfork_child {
- my ($self) = @_;
- my $pid = delete($self->{-ipc_worker_pid}) or
- die "BUG: $$ no -ipc_worker_pid";
- my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock";
- $pid == $$ and die "BUG: $$ shard_atfork_child called on itself";
- close($s1) or die "close -ipc_sock: $!";
+ $self->ipc_do('add_xapian', $eml, $smsg);
}
# wait for return to determine when ipc_do('commit_txn_lazy') is done
$self->ipc_worker_stop;
}
-sub shard_remove {
- my ($self, $num) = @_;
- $self->ipc_do('remove_by_docid', $num);
-}
-
-sub shard_set_keywords {
- my ($self, $docid, @kw) = @_;
- $self->ipc_do('set_keywords', $docid, @kw);
-}
-
-sub shard_remove_keywords {
- my ($self, $docid, @kw) = @_;
- $self->ipc_do('remove_keywords', $docid, @kw);
-}
-
-sub shard_add_keywords {
- my ($self, $docid, @kw) = @_;
- $self->ipc_do('add_keywords', $docid, @kw);
-}
-
sub shard_over_check {
my ($self, $over) = @_;
- if ($self->{-ipc_sock} && $over->{dbh}) {
+ if ($self->{-ipc_req} && $over->{dbh}) {
# can't send DB handles over IPC
$over = ref($over)->new($over->{dbh}->sqlite_db_filename);
}