]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei q: fix SIGPIPE handling from lei2mail workers
authorEric Wong <e@80x24.org>
Tue, 19 Jan 2021 09:34:28 +0000 (09:34 +0000)
committerEric Wong <e@80x24.org>
Thu, 21 Jan 2021 03:29:07 +0000 (03:29 +0000)
We need to properly propagate SIGPIPE to the top-level
lei-daemon process and avoid relying on auto-close,
since auto-close triggers Perl warnings when explicit
close() does not.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
xt/lei-sigpipe.t

index 4b1dc673bb6123f19f40f374877ec35d6f0a232f..802d2cd95396871bb18f3f110f4f7a04fc21e44b 100644 (file)
@@ -301,10 +301,13 @@ sub atfork_child_wq {
        PIPE => sub {
                $self->x_it(13); # SIGPIPE = 13
                # we need to close explicitly to avoid Perl warning on SIGPIPE
-               close(delete $self->{1});
-               # regular files and /dev/null (-c) won't trigger SIGPIPE
-               close(delete $self->{2}) unless (-f $self->{2} || -c _);
-               syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait
+               for my $i (1, 2) {
+                       next unless $self->{$i} && (-p $self->{$i} || -S _);
+                       close(delete $self->{$i});
+               }
+               # trigger the LeiXSearch $done OpPipe:
+               syswrite($self->{0}, '!') if $self->{0} && -p $self->{0};
+               $SIG{PIPE} = 'DEFAULT';
                die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
        });
 }
@@ -322,7 +325,7 @@ sub atfork_parent_wq {
        my @io = delete @$ret{0..2};
        $io[3] = delete($ret->{sock}) // *STDERR{GLOB};
        my $l2m = $ret->{l2m};
-       if ($l2m && $l2m != $wq) {
+       if ($l2m && $l2m != $wq) { # $wq == lxs
                $io[4] = $l2m->{-wq_s1} if $l2m->{-wq_s1};
                if (my @pids = $l2m->wq_close) {
                        $wq->{l2m_pids} = \@pids;
@@ -672,7 +675,7 @@ sub start_mua {
        @cmd = map { $_ eq '%f' ? ($replaced = $mfolder) : $_ } @cmd;
        push @cmd, $mfolder unless defined($replaced);
        $sock //= $self->{sock};
-       if ($PublicInbox::DS::in_loop) { # lei(1) client process runs it
+       if ($sock) { # lei(1) client process runs it
                send($sock, exec_buf(\@cmd, {}), MSG_EOR);
        } else { # oneshot
                $self->{"mua.pid.$self.$$"} = spawn(\@cmd);
index a1dce550230ef9aac2355ab10a3a7c8148b2a89e..8e58ad110fa8d0f56d82c2d620f6f3cca445c573 100644 (file)
@@ -247,11 +247,16 @@ sub _mbox_write_cb ($$) {
        $dedupe->prepare_dedupe;
        sub { # for git_to_mail
                my ($buf, $oid, $kw) = @_;
+               return unless $out;
                my $eml = PublicInbox::Eml->new($buf);
                if (!$dedupe->is_dup($eml, $oid)) {
                        $buf = $eml2mbox->($eml, $kw);
                        my $lk = $ovv->lock_for_scope;
-                       $write->($out, $buf);
+                       eval { $write->($out, $buf) };
+                       if ($@) {
+                               die $@ if ref($@) ne 'PublicInbox::SIGPIPE';
+                               undef $out
+                       }
                }
        }
 }
index 73fd17f49ba7b8dc671a4e8951949088f4826433..45a073a06289d3c103e6e3f6f2fe7859b0c8958f 100644 (file)
@@ -219,7 +219,7 @@ sub start_query { # always runs in main (lei-daemon) process
        @$io = ();
 }
 
-sub query_prepare { # for wq_do,
+sub query_prepare { # called by wq_do
        my ($self, $lei) = @_;
        my %sig = $lei->atfork_child_wq($self);
        local @SIG{keys %sig} = values %sig;
@@ -227,6 +227,18 @@ sub query_prepare { # for wq_do,
        $lei->fail($@) if $@;
 }
 
+sub sigpipe_handler {
+       my ($self, $lei_orig, $pids) = @_;
+       if ($pids) { # one-shot (no event loop)
+               kill 'TERM', @$pids;
+               kill 'PIPE', $$;
+       } else {
+               $self->wq_kill;
+               $self->wq_close;
+       }
+       close(delete $lei_orig->{1}) if $lei_orig->{1};
+}
+
 sub do_query {
        my ($self, $lei_orig, $srcs) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
@@ -234,7 +246,10 @@ sub do_query {
        pipe(my $done, $io[0]) or die "pipe $!";
 
        $lei_orig->event_step_init; # wait for shutdowns
-       my $done_op = { '' => [ \&query_done, $self, $lei_orig ] };
+       my $done_op = {
+               '' => [ \&query_done, $self, $lei_orig ],
+               '!' => [ \&sigpipe_handler, $self, $lei_orig ]
+       };
        my $in_loop = exists $lei_orig->{sock};
        $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
        my $l2m = $lei->{l2m};
@@ -244,7 +259,7 @@ sub do_query {
                my @l2m_io = (undef, @io[1..$#io]);
                pipe(my $startq, $l2m_io[0]) or die "pipe: $!";
                $self->wq_do('query_prepare', \@l2m_io, $lei);
-               $io[4] //= *STDERR{GLOB};
+               $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
                die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
                fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
                $io[5] = $startq;
@@ -253,7 +268,7 @@ sub do_query {
        unless ($in_loop) {
                my @pids = $self->wq_close;
                # for the $lei->atfork_child_wq PIPE handler:
-               $done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+               $done_op->{'!'}->[3] = \@pids;
                $done->event_step;
                my $ipc_worker_reap = $self->can('ipc_worker_reap');
                if (my $l2m_pids = delete $self->{l2m_pids}) {
index 4d35bbb37e9381560d991eadd3fff548e4a91e80..448bd7db87e3cf8bd701b1408a81fab25dfda7d5 100644 (file)
@@ -11,19 +11,22 @@ require_mods(qw(json DBD::SQLite Search::Xapian));
 
 my $do_test = sub {
        my $env = shift // {};
-       pipe(my ($r, $w)) or BAIL_OUT $!;
-       open my $err, '+>', undef or BAIL_OUT $!;
-       my $opt = { run_mode => 0, 1 => $w, 2 => $err };
-       my $tp = start_script([qw(lei q -t), 'bytes:1..'], $env, $opt);
-       close $w;
-       sysread($r, my $buf, 1);
-       close $r; # trigger SIGPIPE
-       $tp->join;
-       ok(WIFSIGNALED($?), 'signaled');
-       is(WTERMSIG($?), SIGPIPE, 'got SIGPIPE');
-       seek($err, 0, 0);
-       my @err = grep(!m{mkdir /dev/null\b}, <$err>);
-       is_deeply(\@err, [], 'no errors');
+       for my $out ([], [qw(-f mboxcl2)]) {
+               pipe(my ($r, $w)) or BAIL_OUT $!;
+               open my $err, '+>', undef or BAIL_OUT $!;
+               my $opt = { run_mode => 0, 1 => $w, 2 => $err };
+               my $cmd = [qw(lei q -t), @$out, 'bytes:1..'];
+               my $tp = start_script($cmd, $env, $opt);
+               close $w;
+               sysread($r, my $buf, 1);
+               close $r; # trigger SIGPIPE
+               $tp->join;
+               ok(WIFSIGNALED($?), "signaled @$out");
+               is(WTERMSIG($?), SIGPIPE, "got SIGPIPE @$out");
+               seek($err, 0, 0);
+               my @err = grep(!m{mkdir /dev/null\b}, <$err>);
+               is_deeply(\@err, [], "no errors @$out");
+       }
 };
 
 $do_test->();