- (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, !$lei->{oneshot});
- my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
- delete($lei->{pkt_op});
-
- $lei->event_step_init; # wait for shutdowns
+ $lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
+ my $od = PublicInbox::OnDestroy->new($$, \&delete_pkt_op, $lei);
+ ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops);
+ $lei->{1}->autoflush(1);
+ $lei->start_pager if delete $lei->{need_pager};
+ $lei->{ovv}->ovv_begin($lei);
+ if ($l2m) {
+ $l2m->pre_augment($lei);
+ if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
+ $lei->start_mua;
+ }
+ $l2m->wq_workers_start('lei2mail', undef,
+ $lei->oldset, { lei => $lei });
+ pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
+ # 1031: F_SETPIPE_SZ
+ fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+ }
+ if (!$lei->{opt}->{threads} && locals($self)) { # for query_mset
+ # lei->{git_tmp} is set for wq_wait_old so we don't
+ # delete until all lei2mail + lei_xsearch workers are reaped
+ $lei->{git_tmp} = $self->{git_tmp} = git_tmp($self);
+ }
+ $self->wq_workers_start('lei_xsearch', undef,
+ $lei->oldset, { lei => $lei });
+ my $op = delete $lei->{pkt_op_c};
+ delete $lei->{pkt_op_p};
+ $self->{threads} = $lei->{opt}->{threads};