- @io = ();
- close $qry_done; # fully closed when children are done
-
- # query_done will run when query_*mset close $qry_done
- if ($lei_orig->{sock}) { # watch for client premature exit
- require PublicInbox::EOFpipe;
- PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
- $lei_orig->{lxs} = $self;
- $lei_orig->event_step_init;
+ close $io->[0]; # qry_status_wr
+ @$io = ();
+}
+
+sub query_prepare { # wq_do
+ my ($self, $lei) = @_;
+ my %sig = $lei->atfork_child_wq($self);
+ local @SIG{keys %sig} = values %sig;
+ if (my $l2m = $lei->{l2m}) {
+ eval { $l2m->do_augment($lei) };
+ return $lei->fail($@) if $@;
+ }
+ # trigger PublicInbox::OpPipe->event_step
+ my $qry_status_wr = $lei->{0} or
+ return $lei->fail('BUG: qry_status_wr missing');
+ $qry_status_wr->autoflush(1);
+ print $qry_status_wr '.' or # this should never fail...
+ return $lei->fail("BUG? print qry_status_wr: $!");
+}
+
+sub do_query {
+ my ($self, $lei_orig, $srcs) = @_;
+ my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+ $io[0] = undef;
+ pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
+
+ $lei_orig->{lxs} = $self;
+ $lei_orig->event_step_init; # wait for shutdowns
+ my $op_map = { '' => [ \&query_done, $lei_orig ] };
+ my $in_loop = exists $lei_orig->{sock};
+ my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+ if (my $l2m = $lei->{l2m}) {
+ $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
+ $io[1] = $lei_orig->{1};
+ $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
+ $self->wq_do('query_prepare', \@io, $lei);
+ $opp->event_step if !$in_loop;