]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei + ipc: simplify process reaping
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index 3ec755280dc995250a1a4dac6d545b432748fbda..fd2c8a37c0aab60b8f0198ea7b7b60ed3a256b7b 100644 (file)
@@ -413,14 +413,14 @@ sub query_done { # EOF callback for main daemon
        my ($lei) = @_;
        local $PublicInbox::LEI::current_lei = $lei;
        my $l2m = delete $lei->{l2m};
-       $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
-       if (my $lxs = delete $lei->{lxs}) {
-               $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
-       }
+       delete $lei->{lxs};
        ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
                warn "BUG: {sto} missing with --mail-sync";
        $lei->sto_done_request if $lei->{sto};
-       my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
+       if (my $v2w = delete $lei->{v2w}) {
+               $v2w->wq_do('done');
+               $v2w->wq_close;
+       }
        $lei->{ovv}->ovv_end($lei);
        my $start_mua;
        if ($l2m) { # close() calls LeiToMail reap_compress
@@ -466,7 +466,7 @@ sub do_post_augment {
        if ($err) {
                if (my $lxs = delete $lei->{lxs}) {
                        $lxs->wq_kill('-TERM');
-                       $lxs->wq_close(0, undef, $lei);
+                       $lxs->wq_close;
                }
                $lei->fail("$err");
        }
@@ -514,7 +514,7 @@ sub start_query ($$) { # always runs in main (lei-daemon) process
        if ($self->{-do_lcat}) {
                $self->wq_io_do('lcat_dump', []);
        }
-       $self->wq_close(1); # lei_xsearch workers stop when done
+       $self->wq_close; # lei_xsearch workers stop when done
 }
 
 sub incr_start_query { # called whenever an l2m shard starts do_post_auth
@@ -569,12 +569,14 @@ sub do_query {
                }
                $l2m->wq_workers_start('lei2mail', undef,
                                        $lei->oldset, { lei => $lei });
+               $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
                pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
                fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                delete $l2m->{au_peers};
        }
        $self->wq_workers_start('lei_xsearch', undef,
                                $lei->oldset, { lei => $lei });
+       $self->wq_wait_async(\&xsearch_done_wait, $lei);
        my $op_c = delete $lei->{pkt_op_c};
        delete $lei->{pkt_op_p};
        @$end = ();