]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei: test SIGPIPE, stop xsearch workers on client abort
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index c030b2b29b6f8784a9e733d25192532429f62de8..d06b6f1d63ad4a7480a6324949278722fe606ed8 100644 (file)
@@ -92,7 +92,9 @@ sub _mset_more ($$) {
 
 sub query_thread_mset { # for --thread
        my ($self, $lei, $ibxish) = @_;
-       local %SIG = (%SIG, $lei->atfork_child_wq($self));
+       my %sig = $lei->atfork_child_wq($self);
+       local @SIG{keys %sig} = values %sig;
+
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
        unless ($srch && $over) {
                my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -125,9 +127,10 @@ sub query_thread_mset { # for --thread
 
 sub query_mset { # non-parallel for non-"--thread" users
        my ($self, $lei, $srcs) = @_;
+       my %sig = $lei->atfork_child_wq($self);
+       local @SIG{keys %sig} = values %sig;
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
-       local %SIG = (%SIG, $lei->atfork_child_wq($self));
        $self->attach_external($_) for @$srcs;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        do {
@@ -143,9 +146,17 @@ sub query_mset { # non-parallel for non-"--thread" users
        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
+sub query_done { # PublicInbox::EOFpipe callback
+       my ($lei) = @_;
+       $lei->{ovv}->ovv_end($lei);
+       $lei->dclose;
+}
+
 sub do_query {
-       my ($self, $lei_orig, $qry_done, $srcs) = @_;
+       my ($self, $lei_orig, $srcs) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+
+       pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
        $io[0] = $qry_done; # don't need stdin
        $io[1]->autoflush(1);
        $io[2]->autoflush(1);
@@ -160,9 +171,20 @@ sub do_query {
        for my $rmt (@{$self->{remotes} // []}) {
                $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
        }
-
-       # sent off to children, they will drop remaining references to it
-       close $qry_done;
+       @io = ();
+       close $qry_done; # fully closed when children are done
+
+       # query_done will run when query_*mset close $qry_done
+       if ($lei_orig->{sock}) { # watch for client premature exit
+               require PublicInbox::EOFpipe;
+               PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
+               $lei_orig->{lxs} = $self;
+               $lei_orig->event_step_init;
+       } else {
+               $self->wq_close;
+               read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+               query_done($lei_orig); # may SIGPIPE
+       }
 }
 
 sub ipc_atfork_child {