$size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
}
+# $startq will EOF when query_prepare is done augmenting and allow
+# query_mset and query_thread_mset to proceed.
+sub wait_startq ($) {
+ my ($startq) = @_;
+ $_[0] = undef;
+ read($startq, my $query_prepare_done, 1);
+}
+
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
+ my $startq = delete $self->{5};
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
+ wait_startq($startq) if $startq;
next if $dedupe->is_smsg_dup($smsg);
my $mitem = delete $n2item{$smsg->{num}};
$each_smsg->($smsg, $mitem);
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei, $srcs) = @_;
+ my $startq = delete $self->{5};
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
my $mo = { %{$lei->{mset_opt}} };
$mset = $self->mset($mo->{qstr}, $mo);
for my $it ($mset->items) {
my $smsg = smsg_for($self, $it) or next;
+ wait_startq($startq) if $startq;
next if $dedupe->is_smsg_dup($smsg);
$each_smsg->($smsg, $it);
}
@$io = ();
}
-sub query_prepare { # wq_do
+sub query_prepare { # for wq_do,
my ($self, $lei) = @_;
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
- if (my $l2m = $lei->{l2m}) {
- eval { $l2m->do_augment($lei) };
- return $lei->fail($@) if $@;
- }
- # trigger PublicInbox::OpPipe->event_step
- my $qry_status_wr = $lei->{0} or
- return $lei->fail('BUG: qry_status_wr missing');
- $qry_status_wr->autoflush(1);
- print $qry_status_wr '.' or # this should never fail...
- return $lei->fail("BUG? print qry_status_wr: $!");
+ eval { $lei->{l2m}->do_augment($lei) };
+ $lei->fail($@) if $@;
}
sub do_query {
my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
$io[0] = undef;
- pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
+ pipe(my $done, $io[0]) or die "pipe $!";
$lei_orig->event_step_init; # wait for shutdowns
- my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
+ my $done_op = { '' => [ \&query_done, $self, $lei_orig ] };
my $in_loop = exists $lei_orig->{sock};
- my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+ $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
my $l2m = $lei->{l2m};
if ($l2m) {
$l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
$io[1] = $lei_orig->{1};
- $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
- $self->wq_do('query_prepare', \@io, $lei);
- $opp->event_step if !$in_loop;
- } else {
- start_query($self, \@io, $lei, $srcs);
+ my @l2m_io = (undef, @io[1..$#io]);
+ pipe(my $startq, $l2m_io[0]) or die "pipe: $!";
+ $self->wq_do('query_prepare', \@l2m_io, $lei);
+ $io[4] //= *STDERR{GLOB};
+ die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
+ fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
+ $io[5] = $startq;
}
+ start_query($self, \@io, $lei, $srcs);
unless ($in_loop) {
my @pids = $self->wq_close;
# for the $lei->atfork_child_wq PIPE handler:
- $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
- $opp->event_step;
+ $done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+ $done->event_step;
my $ipc_worker_reap = $self->can('ipc_worker_reap');
if (my $l2m_pids = delete $self->{l2m_pids}) {
dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
sub ipc_atfork_prepare {
my ($self) = @_;
- # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
- $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
+ # (0: qry_status_wr, 1: stdout|mbox, 2: stderr,
+ # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
+ $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]);
$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}