X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FSearchIdxShard.pm;h=000abd9413b5ba5159f2d540f8344805564c6ad9;hb=23af251dd607c4e75ab1e68063f2c885c48cc035;hp=68644bc00a57128e920c9e622aa9ddf109177358;hpb=bf8df8160076d7a1231e0858ca4c243df3b7c860;p=public-inbox.git diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 68644bc0..000abd94 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -21,6 +21,14 @@ sub new { if ($v2w->{parallel}) { local $self->{-v2w_afc} = $v2w; $self->ipc_worker_spawn("shard[$shard]"); + # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size for + # inputs speeds V2Writable batch imports across 8 cores by + # nearly 20%. Since any of our responses are small, make + # the response pipe as small as possible + if ($^O eq 'linux') { + fcntl($self->{-ipc_req}, 1031, 1048576); + fcntl($self->{-ipc_res}, 1031, 4096); + } } $self; } @@ -36,43 +44,17 @@ sub _worker_done { sub ipc_atfork_child { # called automatically before ipc_worker_loop my ($self) = @_; my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing'; - $v2w->atfork_child; # calls shard_atfork_child on our siblings + $v2w->atfork_child; # calls ipc_sibling_atfork_child on our siblings $v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__} $self->begin_txn_lazy; # caller must capture this: PublicInbox::OnDestroy->new($$, \&_worker_done, $self); } -sub index_raw { - my ($self, $msgref, $eml, $smsg, $eidx_key) = @_; - if ($eml) { - undef($$msgref) if $msgref; - } else { # --xapian-only + --sequential-shard: - $eml = PublicInbox::Eml->new($msgref); - } +sub index_eml { + my ($self, $eml, $smsg, $eidx_key) = @_; $smsg->{eidx_key} = $eidx_key if defined $eidx_key; - $self->ipc_do('add_message', $eml, $smsg); -} - -sub shard_add_eidx_info { - my ($self, $docid, $eidx_key, $eml) = @_; - $self->ipc_do('add_eidx_info', $docid, $eidx_key, $eml); -} - -sub shard_remove_eidx_info { - my ($self, $docid, $eidx_key, $eml) = @_; - $self->ipc_do('remove_eidx_info', $docid, $eidx_key, $eml); -} - -# needed when there's multiple IPC workers and the parent forking -# causes newer siblings to inherit older siblings sockets -sub shard_atfork_child { - my ($self) = @_; - my $pid = delete($self->{-ipc_worker_pid}) or - die "BUG: $$ no -ipc_worker_pid"; - my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock"; - $pid == $$ and die "BUG: $$ shard_atfork_child called on itself"; - close($s1) or die "close -ipc_sock: $!"; + $self->ipc_do('add_xapian', $eml, $smsg); } # wait for return to determine when ipc_do('commit_txn_lazy') is done @@ -93,33 +75,4 @@ sub shard_close { $self->ipc_worker_stop; } -sub shard_remove { - my ($self, $num) = @_; - $self->ipc_do('remove_by_docid', $num); -} - -sub shard_set_keywords { - my ($self, $docid, @kw) = @_; - $self->ipc_do('set_keywords', $docid, @kw); -} - -sub shard_remove_keywords { - my ($self, $docid, @kw) = @_; - $self->ipc_do('remove_keywords', $docid, @kw); -} - -sub shard_add_keywords { - my ($self, $docid, @kw) = @_; - $self->ipc_do('add_keywords', $docid, @kw); -} - -sub shard_over_check { - my ($self, $over) = @_; - if ($self->{-ipc_sock} && $over->{dbh}) { - # can't send DB handles over IPC - $over = ref($over)->new($over->{dbh}->sqlite_db_filename); - } - $self->ipc_do('over_check', $over); -} - 1;