From 39c8c7a8db43cb0e338cfcedaa81afd8627e9ff4 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 4 Feb 2021 00:59:21 -0900 Subject: [PATCH] lei q: delay worker spawn Now that --stdin support is sorted, we can delay spawning workers until we know the query is ready-to-run. --- lib/PublicInbox/LeiQuery.pm | 19 +++++-------------- lib/PublicInbox/LeiXSearch.pm | 6 ++++++ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 4fe40400..6b1aa40c 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -75,21 +75,12 @@ sub lei_q { $xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY" my $nproc = $lxs->detect_nproc; # don't memoize, schedtool(1) exists $xj = $nproc if $xj > $nproc; - PublicInbox::LeiOverview->new($self) or return; - $self->atfork_prepare_wq($lxs); - $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset); - delete $lxs->{-ipc_atfork_child_close}; - if (my $l2m = $self->{l2m}) { - if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) { - return $self->fail("`$mj' writer jobs must be >= 1"); - } - $mj //= $nproc; - $self->atfork_prepare_wq($l2m); - $l2m->wq_workers_start('lei2mail', $mj, $self->oldset); - delete $l2m->{-ipc_atfork_child_close}; + $lxs->{jobs} = $xj; + if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) { + return $self->fail("`$mj' writer jobs must be >= 1"); } - - # no forking workers after this + $self->{l2m}->{jobs} = ($mj // $nproc) if $self->{l2m}; + PublicInbox::LeiOverview->new($self) or return; my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset); $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 965617b5..ab66717c 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -406,7 +406,13 @@ sub do_query { $lei->{ovv}->ovv_begin($lei); my ($au_done, $zpipe); my $l2m = $lei->{l2m}; + $lei->atfork_prepare_wq($self); + $self->wq_workers_start('lei_xsearch', $self->{jobs}, $lei->oldset); + delete $self->{-ipc_atfork_child_close}; if ($l2m) { + $lei->atfork_prepare_wq($l2m); + $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, $lei->oldset); + delete $l2m->{-ipc_atfork_child_close}; pipe($lei->{startq}, $au_done) or die "pipe: $!"; # 1031: F_SETPIPE_SZ fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; -- 2.44.0