]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei q: fix SIGPIPE handling from lei2mail workers
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index 73fd17f49ba7b8dc671a4e8951949088f4826433..45a073a06289d3c103e6e3f6f2fe7859b0c8958f 100644 (file)
@@ -219,7 +219,7 @@ sub start_query { # always runs in main (lei-daemon) process
        @$io = ();
 }
 
-sub query_prepare { # for wq_do,
+sub query_prepare { # called by wq_do
        my ($self, $lei) = @_;
        my %sig = $lei->atfork_child_wq($self);
        local @SIG{keys %sig} = values %sig;
@@ -227,6 +227,18 @@ sub query_prepare { # for wq_do,
        $lei->fail($@) if $@;
 }
 
+sub sigpipe_handler {
+       my ($self, $lei_orig, $pids) = @_;
+       if ($pids) { # one-shot (no event loop)
+               kill 'TERM', @$pids;
+               kill 'PIPE', $$;
+       } else {
+               $self->wq_kill;
+               $self->wq_close;
+       }
+       close(delete $lei_orig->{1}) if $lei_orig->{1};
+}
+
 sub do_query {
        my ($self, $lei_orig, $srcs) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
@@ -234,7 +246,10 @@ sub do_query {
        pipe(my $done, $io[0]) or die "pipe $!";
 
        $lei_orig->event_step_init; # wait for shutdowns
-       my $done_op = { '' => [ \&query_done, $self, $lei_orig ] };
+       my $done_op = {
+               '' => [ \&query_done, $self, $lei_orig ],
+               '!' => [ \&sigpipe_handler, $self, $lei_orig ]
+       };
        my $in_loop = exists $lei_orig->{sock};
        $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
        my $l2m = $lei->{l2m};
@@ -244,7 +259,7 @@ sub do_query {
                my @l2m_io = (undef, @io[1..$#io]);
                pipe(my $startq, $l2m_io[0]) or die "pipe: $!";
                $self->wq_do('query_prepare', \@l2m_io, $lei);
-               $io[4] //= *STDERR{GLOB};
+               $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
                die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
                fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
                $io[5] = $startq;
@@ -253,7 +268,7 @@ sub do_query {
        unless ($in_loop) {
                my @pids = $self->wq_close;
                # for the $lei->atfork_child_wq PIPE handler:
-               $done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+               $done_op->{'!'}->[3] = \@pids;
                $done->event_step;
                my $ipc_worker_reap = $self->can('ipc_worker_reap');
                if (my $l2m_pids = delete $self->{l2m_pids}) {