]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei + ipc: simplify process reaping
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index fba168613d96a2cdec8f8baec1d350e8dff76d40..fd2c8a37c0aab60b8f0198ea7b7b60ed3a256b7b 100644 (file)
@@ -282,11 +282,9 @@ sub each_remote_eml { # callback for MboxReader->mboxrd
        my $xoids = $lei->{ale}->xoids_for($eml, 1);
        my $smsg = bless {}, 'PublicInbox::Smsg';
        if ($self->{import_sto} && !$xoids) {
-               my $res = $self->{import_sto}->wq_do('add_eml', $eml);
-               if (ref($res) eq ref($smsg)) { # totally new message
-                       $smsg = $res;
-                       $smsg->{kw} = []; # short-circuit xsmsg_vmd
-               }
+               my ($res, $kw) = $self->{import_sto}->wq_do('add_eml', $eml);
+               $smsg = $res if ref($res) eq ref($smsg); # totally new message
+               $smsg->{kw} = $kw; # short-circuit xsmsg_vmd
        }
        $smsg->{blob} //= $xoids ? (keys(%$xoids))[0]
                                : $lei->git_oid($eml)->hexdigest;
@@ -415,14 +413,14 @@ sub query_done { # EOF callback for main daemon
        my ($lei) = @_;
        local $PublicInbox::LEI::current_lei = $lei;
        my $l2m = delete $lei->{l2m};
-       $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
-       if (my $lxs = delete $lei->{lxs}) {
-               $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
-       }
+       delete $lei->{lxs};
        ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
                warn "BUG: {sto} missing with --mail-sync";
        $lei->sto_done_request if $lei->{sto};
-       my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
+       if (my $v2w = delete $lei->{v2w}) {
+               $v2w->wq_do('done');
+               $v2w->wq_close;
+       }
        $lei->{ovv}->ovv_end($lei);
        my $start_mua;
        if ($l2m) { # close() calls LeiToMail reap_compress
@@ -468,7 +466,7 @@ sub do_post_augment {
        if ($err) {
                if (my $lxs = delete $lei->{lxs}) {
                        $lxs->wq_kill('-TERM');
-                       $lxs->wq_close(0, undef, $lei);
+                       $lxs->wq_close;
                }
                $lei->fail("$err");
        }
@@ -516,7 +514,7 @@ sub start_query ($$) { # always runs in main (lei-daemon) process
        if ($self->{-do_lcat}) {
                $self->wq_io_do('lcat_dump', []);
        }
-       $self->wq_close(1); # lei_xsearch workers stop when done
+       $self->wq_close; # lei_xsearch workers stop when done
 }
 
 sub incr_start_query { # called whenever an l2m shard starts do_post_auth
@@ -571,12 +569,14 @@ sub do_query {
                }
                $l2m->wq_workers_start('lei2mail', undef,
                                        $lei->oldset, { lei => $lei });
+               $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
                pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
                fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                delete $l2m->{au_peers};
        }
        $self->wq_workers_start('lei_xsearch', undef,
                                $lei->oldset, { lei => $lei });
+       $self->wq_wait_async(\&xsearch_done_wait, $lei);
        my $op_c = delete $lei->{pkt_op_c};
        delete $lei->{pkt_op_p};
        @$end = ();