my ($lei) = @_;
local $PublicInbox::LEI::current_lei = $lei;
my $l2m = delete $lei->{l2m};
- $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
- }
+ delete $lei->{lxs};
($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
warn "BUG: {sto} missing with --mail-sync";
$lei->sto_done_request if $lei->{sto};
- my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
+ if (my $v2w = delete $lei->{v2w}) {
+ $v2w->wq_do('done');
+ $v2w->wq_close;
+ }
$lei->{ovv}->ovv_end($lei);
my $start_mua;
if ($l2m) { # close() calls LeiToMail reap_compress
if ($err) {
if (my $lxs = delete $lei->{lxs}) {
$lxs->wq_kill('-TERM');
- $lxs->wq_close(0, undef, $lei);
+ $lxs->wq_close;
}
$lei->fail("$err");
}
if ($self->{-do_lcat}) {
$self->wq_io_do('lcat_dump', []);
}
- $self->wq_close(1); # lei_xsearch workers stop when done
+ $self->wq_close; # lei_xsearch workers stop when done
}
sub incr_start_query { # called whenever an l2m shard starts do_post_auth
}
$l2m->wq_workers_start('lei2mail', undef,
$lei->oldset, { lei => $lei });
+ $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
delete $l2m->{au_peers};
}
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
+ $self->wq_wait_async(\&xsearch_done_wait, $lei);
my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();