- # may redirect $lei->{1} for mbox
- my $zpipe = $l2m->pre_augment($lei_orig);
- $io[1] = $lei_orig->{1};
- pipe(my ($startq, $au_done)) or die "pipe: $!";
- $done_op->{'.'} = [ \&do_post_augment, $lei_orig,
- $zpipe, $au_done ];
- local $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
- die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
- $self->wq_do('query_prepare', \@io, $lei);
- fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
- $io[5] = $startq;
+ pipe($lei->{startq}, $au_done) or die "pipe: $!";
+ # 1031: F_SETPIPE_SZ
+ fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+ $zpipe = $l2m->pre_augment($lei);
+ }
+ my $in_loop = exists $lei->{sock};
+ my $ops = {
+ '|' => [ \&sigpipe_handler, $lei ],
+ '!' => [ \&fail_handler, $lei ],
+ '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
+ '' => [ \&query_done, $lei ],
+ 'mset_progress' => [ \&mset_progress, $lei ],
+ };
+ (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop);
+ my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
+ delete($lei->{pkt_op});
+
+ $lei->event_step_init; # wait for shutdowns
+ if ($l2m) {
+ $self->wq_do('query_prepare', \@io, $lei_ipc);