X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiXSearch.pm;h=fd2c8a37c0aab60b8f0198ea7b7b60ed3a256b7b;hb=a1733d3406dfbde52d1468e671edd1d76893f546;hp=3ec755280dc995250a1a4dac6d545b432748fbda;hpb=ea11b7b17d525d20a07d7f62c0334501c5a721b4;p=public-inbox.git diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 3ec75528..fd2c8a37 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -413,14 +413,14 @@ sub query_done { # EOF callback for main daemon my ($lei) = @_; local $PublicInbox::LEI::current_lei = $lei; my $l2m = delete $lei->{l2m}; - $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m; - if (my $lxs = delete $lei->{lxs}) { - $lxs->wq_wait_old(\&xsearch_done_wait, $lei); - } + delete $lei->{lxs}; ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and warn "BUG: {sto} missing with --mail-sync"; $lei->sto_done_request if $lei->{sto}; - my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef; + if (my $v2w = delete $lei->{v2w}) { + $v2w->wq_do('done'); + $v2w->wq_close; + } $lei->{ovv}->ovv_end($lei); my $start_mua; if ($l2m) { # close() calls LeiToMail reap_compress @@ -466,7 +466,7 @@ sub do_post_augment { if ($err) { if (my $lxs = delete $lei->{lxs}) { $lxs->wq_kill('-TERM'); - $lxs->wq_close(0, undef, $lei); + $lxs->wq_close; } $lei->fail("$err"); } @@ -514,7 +514,7 @@ sub start_query ($$) { # always runs in main (lei-daemon) process if ($self->{-do_lcat}) { $self->wq_io_do('lcat_dump', []); } - $self->wq_close(1); # lei_xsearch workers stop when done + $self->wq_close; # lei_xsearch workers stop when done } sub incr_start_query { # called whenever an l2m shard starts do_post_auth @@ -569,12 +569,14 @@ sub do_query { } $l2m->wq_workers_start('lei2mail', undef, $lei->oldset, { lei => $lei }); + $l2m->wq_wait_async(\&xsearch_done_wait, $lei); pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; delete $l2m->{au_peers}; } $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }); + $self->wq_wait_async(\&xsearch_done_wait, $lei); my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = ();