]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: test SIGPIPE, stop xsearch workers on client abort
authorEric Wong <e@80x24.org>
Thu, 14 Jan 2021 07:06:15 +0000 (19:06 -1200)
committerEric Wong <e@80x24.org>
Thu, 14 Jan 2021 23:14:08 +0000 (23:14 +0000)
The new test ensures consistency between oneshot and
client/daemon users.  Cancelling an in-progress result now also
stops xsearch workers to avoid wasted CPU and I/O.

Note the lei->atfork_child_wq usage changes, it is to workaround
a bug in Perl 5: http://nntp.perl.org/group/perl.perl5.porters/258784
<CAHhgV8hPbcmkzWizp6Vijw921M5BOXixj4+zTh3nRS9vRBYk8w@mail.gmail.com>

This switches the internal protocol to use SOCK_SEQPACKET
AF_UNIX sockets to prevent merging messages from the daemon to
client to run pager and kill/exit the client script.

MANIFEST
lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiXSearch.pm
script/lei
t/lei.t
xt/lei-sigpipe.t [new file with mode: 0644]

index 810aec4261fc5cf2fde30c82e6e49fd5ff8313b2..2ca240fc90ea317f2e75935a3806dc2e4da531fb 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -429,6 +429,7 @@ xt/git_async_cmp.t
 xt/httpd-async-stream.t
 xt/imapd-mbsync-oimap.t
 xt/imapd-validate.t
+xt/lei-sigpipe.t
 xt/mem-imapd-tls.t
 xt/mem-msgview.t
 xt/msgtime_cmp.t
index c54fcc64481966fd933a63ed81a04e09d836a652..fbc91f6f96ea013252721c2c3127f70a6213d979 100644 (file)
@@ -130,7 +130,8 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
        my ($self, $pid) = @_;
-       warn "PID:$pid died with \$?=$?\n" if $?;
+       # SIGTERM (15) is our default exit signal
+       warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
 }
 
 # for base class, override in sub classes
@@ -236,50 +237,31 @@ sub ipc_sibling_atfork_child {
        $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _close_recvd ($) {
-       my ($self) = @_;
-       my $x = $self->{-wq_recv_modes};
-       my $end = $x ? $#$x : 2;
-       close($_) for (grep { defined } (delete @$self{0..$end}));
-}
-
 sub wq_worker_loop ($) {
        my ($self) = @_;
-       my $buf;
        my $len = $self->{wq_req_len} // (4096 * 33);
-       my ($sub, $args);
        my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-       local $SIG{PIPE} = sub {
-               my $cur_sub = $sub;
-               _close_recvd($self);
-               die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
-       };
        while (1) {
-               my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
-               my $i = 0;
+               my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
                my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
+               my $nfd = 0;
                for my $fd (@fds) {
                        my $mode = shift(@m);
                        if (open(my $cmdfh, $mode, $fd)) {
-                               $self->{$i++} = $cmdfh;
+                               $self->{$nfd++} = $cmdfh;
                                $cmdfh->autoflush(1);
                        } else {
-                               die "$$ open($mode$fd) (FD:$i): $!";
+                               die "$$ open($mode$fd) (FD:$nfd): $!";
                        }
                }
                # Sereal dies on truncated data, Storable returns undef
-               $args = thaw($buf) //
+               my $args = thaw($buf) //
                        die "thaw error on buffer of size:".length($buf);
-               eval {
-                       $sub = shift @$args;
-                       eval { $self->$sub(@$args) };
-                       undef $sub; # quiet SIG{PIPE} handler
-                       die $@ if $@;
-               };
+               my $sub = shift @$args;
+               eval { $self->$sub(@$args) };
                warn "$$ wq_worker: $@" if $@ &&
                                        ref($@) ne 'PublicInbox::SIGPIPE';
-               # need to close explicitly to avoid warnings after SIGPIPE
-               _close_recvd($self);
+               delete @$self{0..($nfd-1)};
        }
 }
 
@@ -400,9 +382,16 @@ sub wq_close {
        }
 }
 
+sub wq_kill {
+       my ($self, $sig) = @_;
+       my $workers = $self->{-wq_workers} or return;
+       kill($sig // 'TERM', keys %$workers);
+}
+
 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
 
 sub DESTROY {
+       wq_kill($_[0]);
        wq_close($_[0]);
        ipc_worker_stop($_[0]);
 }
index 7313738eb3f52aea3e4f2c3740e4c27f400285bd..2889fa76f9cd2107e9a9f1e6282d0b0aa8ba6c61 100644 (file)
@@ -11,13 +11,13 @@ use v5.10.1;
 use parent qw(PublicInbox::DS PublicInbox::LeiExternal
        PublicInbox::LeiQuery);
 use Getopt::Long ();
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
-use Errno qw(EAGAIN ECONNREFUSED ENOENT);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
 use POSIX ();
 use IO::Handle ();
 use Sys::Syslog qw(syslog openlog);
 use PublicInbox::Config;
-use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
+use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
 use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now dwaitpid);
 use PublicInbox::Spawn qw(spawn run_die popen_rd);
@@ -238,16 +238,15 @@ my %CONFIG_KEYS = (
        'leistore.dir' => 'top-level storage location',
 );
 
-sub x_it ($$) { # pronounced "exit"
+# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
+sub x_it ($$) {
        my ($self, $code) = @_;
-       $self->{1}->autoflush(1); # make sure client sees stdout before exit
-       my $sig = ($code & 127);
-       $code >>= 8 unless $sig;
+       # make sure client sees stdout before exit
+       $self->{1}->autoflush(1) if $self->{1};
        if (my $sock = $self->{sock}) {
-               my $fds = [ map { fileno($_) } @$self{0..2} ];
-               $send_cmd->($sock, $fds, "exit=$code\n", 0);
-       } else { # for oneshot
-               $quit->($code);
+               send($sock, "x_it $code", MSG_EOR);
+       } elsif (!($code & 127)) { # oneshot, ignore signals
+               $quit->($code >> 8);
        }
 }
 
@@ -274,22 +273,20 @@ sub atfork_prepare_wq {
                                grep { defined } @$self{qw(0 1 2 sock)}
 }
 
-# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+# usage: my %sig = $lei->atfork_child_wq($wq);
+#       local @SIG{keys %sig} = values %sig;
 sub atfork_child_wq {
        my ($self, $wq) = @_;
-       return () if $self->{0}; # did not fork
-       $self->{$_} = $wq->{$_} for (0..2);
-       $self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef
-       my $oldpipe = $SIG{PIPE};
+       @$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
        %PATH2CFG = ();
        @TO_CLOSE_ATFORK_CHILD = ();
-       (
-               __WARN__ => sub { err($self, @_) },
-               PIPE => sub {
-                       $self->x_it(141);
-                       $oldpipe->() if ref($oldpipe) eq 'CODE';
-               }
-       );
+       (__WARN__ => sub { err($self, @_) },
+       PIPE => sub {
+               $self->x_it(13); # SIGPIPE = 13
+               # we need to close explicitly to avoid Perl warning on SIGPIPE
+               close($_) for (delete @$self{1..2});
+               die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
+       });
 }
 
 # usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
@@ -300,9 +297,9 @@ sub atfork_parent_wq {
                my $ret = bless { %$self }, ref($self);
                $self->{env} = $env;
                delete @$ret{qw(-lei_store cfg pgr)};
-               ($ret, delete @$ret{qw(0 1 2 sock)});
+               ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ());
        } else {
-               ($self, @$self{qw(0 1 2 sock)});
+               ($self, @$self{0..2}, $self->{sock} // ());
        }
 }
 
@@ -647,7 +644,7 @@ sub start_pager {
                my $buf = "exec 1\0".$pager;
                while (my ($k, $v) = each %new_env) { $buf .= "\0$k=$v" };
                my $fds = [ map { fileno($_) } @$rdr{0..2} ];
-               $send_cmd->($sock, $fds, $buf .= "\n", 0);
+               $send_cmd->($sock, $fds, $buf, MSG_EOR);
        } else {
                $pgr->[0] = spawn([$pager], $env, $rdr);
        }
@@ -660,50 +657,39 @@ sub start_pager {
 sub stop_pager {
        my ($self) = @_;
        my $pgr = delete($self->{pgr}) or return;
-       my $pid = $pgr->[0];
-       close $self->{1};
-       # {2} may not be redirected
-       $self->{1} = $pgr->[1];
        $self->{2} = $pgr->[2];
+       # do not restore original stdout, just close it so we error out
+       close(delete($self->{1})) if $self->{1};
+       my $pid = $pgr->[0];
        dwaitpid($pid, undef, $self->{sock}) if $pid;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
        my ($sock) = @_; # ignore other
-       $sock->blocking(1);
        $sock->autoflush(1);
        my $self = bless { sock => $sock }, __PACKAGE__;
-       vec(my $rin = '', fileno($sock), 1) = 1;
-       # `say $sock' triggers "die" in lei(1)
-       my $buf;
-       if (select(my $rout = $rin, undef, undef, 1)) {
-               my @fds = $recv_cmd->($sock, $buf, 4096 * 33); # >MAX_ARG_STRLEN
-               if (scalar(@fds) == 3) {
-                       my $i = 0;
-                       for my $rdr (qw(<&= >&= >&=)) {
-                               my $fd = shift(@fds);
-                               if (open(my $fh, $rdr, $fd)) {
-                                       $self->{$i++} = $fh;
-                               }  else {
-                                       say $sock "open($rdr$fd) (FD=$i): $!";
-                                       return;
-                               }
+       vec(my $rvec, fileno($sock), 1) = 1;
+       select($rvec, undef, undef, 1) or
+               return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
+       my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
+       if (scalar(@fds) == 3) {
+               my $i = 0;
+               for my $rdr (qw(<&= >&= >&=)) {
+                       my $fd = shift(@fds);
+                       if (open(my $fh, $rdr, $fd)) {
+                               $self->{$i++} = $fh;
+                               next;
                        }
-               } else {
-                       say $sock "recv_cmd failed: $!";
-                       return;
+                       return send($sock, "open($rdr$fd) (FD=$i): $!", MSG_EOR);
                }
        } else {
-               say $sock "timed out waiting to recv FDs";
-               return;
+               return send($sock, "recv_cmd failed: $!", MSG_EOR);
        }
        $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
        # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
        # $buf = "$$\0$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
-       if (substr($buf, -2, 2, '') ne "\0\0") { # s/\0\0\z//
-               say $sock "request command truncated";
-               return;
-       }
+       substr($buf, -2, 2, '') eq "\0\0" or  # s/\0\0\z//
+               return send($sock, 'request command truncated', MSG_EOR);
        my ($argc, @argv) = split(/\0/, $buf, -1);
        undef $buf;
        my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -711,23 +697,50 @@ sub accept_dispatch { # Listener {post_accept} callback
                local %ENV = %env;
                $self->{env} = \%env;
                eval { dispatch($self, @argv) };
-               say $sock $@ if $@;
+               send($sock, $@, MSG_EOR) if $@;
        } else {
-               say $sock "chdir($env{PWD}): $!"; # implicit close
+               send($sock, "chdir($env{PWD}): $!", MSG_EOR); # implicit close
        }
 }
 
+sub dclose {
+       my ($self) = @_;
+       delete $self->{lxs}; # stops LeiXSearch queries
+       $self->close; # PublicInbox::DS::close
+}
+
 # for long-running results
 sub event_step {
        my ($self) = @_;
        local %ENV = %{$self->{env}};
-       eval {}; # TODO
-       if ($@) {
-               say { $self->{sock} } $@;
-               $self->close; # PublicInbox::DS::close
+       my $sock = $self->{sock};
+       eval {
+               while (my @fds = $recv_cmd->($sock, my $buf, 4096)) {
+                       if (scalar(@fds) == 1 && !defined($fds[0])) {
+                               return if $! == EAGAIN;
+                               next if $! == EINTR;
+                               last if $! == ECONNRESET;
+                               die "recvmsg: $!";
+                       }
+                       for my $fd (@fds) {
+                               open my $rfh, '+<&=', $fd;
+                       }
+                       die "unrecognized client signal: $buf";
+               }
+               dclose($self);
+       };
+       if (my $err = $@) {
+               eval { $self->fail($err) };
+               dclose($self);
        }
 }
 
+sub event_step_init {
+       my ($self) = @_;
+       $self->{sock}->blocking(0);
+       $self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET);
+}
+
 sub noop {}
 
 our $oldset; sub oldset { $oldset }
@@ -742,7 +755,7 @@ sub lazy_start {
                die "connect($path): $!";
        }
        umask(077) // die("umask(077): $!");
-       socket(my $l, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+       socket(my $l, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
        bind($l, pack_sockaddr_un($path)) or die "bind($path): $!";
        listen($l, 1024) or die "listen: $!";
        my @st = stat($path) or die "stat($path): $!";
@@ -793,7 +806,7 @@ sub lazy_start {
                USR2 => \&noop,
        };
        my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
-       local %SIG = (%SIG, %$sig) if !$sigfd;
+       local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
        local $SIG{PIPE} = 'IGNORE';
        if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
                push @TO_CLOSE_ATFORK_CHILD, $sigfd->{sock};
@@ -853,24 +866,19 @@ sub oneshot {
        local $quit = $exit if $exit;
        local %PATH2CFG;
        umask(077) // die("umask(077): $!");
-       local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) };
-       eval {
-               my $self = bless {
-                       0 => *STDIN{GLOB},
-                       1 => *STDOUT{GLOB},
-                       2 => *STDERR{GLOB},
-                       env => \%ENV
-               }, __PACKAGE__;
-               dispatch($self, @ARGV);
-       };
-       die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+       dispatch((bless {
+               0 => *STDIN{GLOB},
+               1 => *STDOUT{GLOB},
+               2 => *STDERR{GLOB},
+               env => \%ENV
+       }, __PACKAGE__), @ARGV);
 }
 
 # ensures stdout hits the FS before sock disconnects so a client
 # can immediately reread it
 sub DESTROY {
        my ($self) = @_;
-       $self->{1}->autoflush(1);
+       $self->{1}->autoflush(1) if $self->{1};
        stop_pager($self);
 }
 
index 8a1f4f822014a9a539ac9ea6a92f2f3789aeea28..194c5e28aad8b07de799d15a6d667916cb089cab 100644 (file)
@@ -108,8 +108,9 @@ sub _unbless_smsg {
 
 sub ovv_atexit_child {
        my ($self, $lei) = @_;
-       my $bref = delete $lei->{ovv_buf} or return;
-       print { $lei->{1} } $$bref;
+       if (my $bref = delete $lei->{ovv_buf}) {
+               print { $lei->{1} } $$bref;
+       }
 }
 
 # JSON module ->pretty output wastes too much vertical white space,
index 7ca01454f04c7d65bc421582d1143758032ef350..1a3e11932616466a760f258bfa55962cbb954746 100644 (file)
@@ -40,14 +40,13 @@ sub lei_q {
        if ($opt->{external} // 1) {
                $self->_externals_each(\&_vivify_external, \@srcs);
        }
-       my $j = $opt->{jobs} // scalar(@srcs) > 3 ? 3 : scalar(@srcs);
+       my $j = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs));
        $j = 1 if !$opt->{thread};
        $j++ if $opt->{'local'}; # for sto->search below
-       if ($self->{sock}) {
-               $self->atfork_prepare_wq($lxs);
-               $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
-                       // $lxs->wq_workers($j);
-       }
+       $self->atfork_prepare_wq($lxs);
+       $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+               // $lxs->wq_workers($j);
+
        unshift(@srcs, $sto->search) if $opt->{'local'};
        # no forking workers after this
        require PublicInbox::LeiOverview;
@@ -77,16 +76,7 @@ sub lei_q {
        # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
        $self->{mset_opt} = \%mset_opt;
        $self->{ovv}->ovv_begin($self);
-       pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
-       require PublicInbox::EOFpipe;
-       my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
-       $lxs->do_query($self, $qry_done, \@srcs);
-       $eof->event_step unless $self->{sock};
-}
-
-sub query_done { # PublicInbox::EOFpipe callback
-       my ($self) = @_;
-       $self->{ovv}->ovv_end($self);
+       $lxs->do_query($self, \@srcs);
 }
 
 1;
index c030b2b29b6f8784a9e733d25192532429f62de8..d06b6f1d63ad4a7480a6324949278722fe606ed8 100644 (file)
@@ -92,7 +92,9 @@ sub _mset_more ($$) {
 
 sub query_thread_mset { # for --thread
        my ($self, $lei, $ibxish) = @_;
-       local %SIG = (%SIG, $lei->atfork_child_wq($self));
+       my %sig = $lei->atfork_child_wq($self);
+       local @SIG{keys %sig} = values %sig;
+
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
        unless ($srch && $over) {
                my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -125,9 +127,10 @@ sub query_thread_mset { # for --thread
 
 sub query_mset { # non-parallel for non-"--thread" users
        my ($self, $lei, $srcs) = @_;
+       my %sig = $lei->atfork_child_wq($self);
+       local @SIG{keys %sig} = values %sig;
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
-       local %SIG = (%SIG, $lei->atfork_child_wq($self));
        $self->attach_external($_) for @$srcs;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        do {
@@ -143,9 +146,17 @@ sub query_mset { # non-parallel for non-"--thread" users
        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
+sub query_done { # PublicInbox::EOFpipe callback
+       my ($lei) = @_;
+       $lei->{ovv}->ovv_end($lei);
+       $lei->dclose;
+}
+
 sub do_query {
-       my ($self, $lei_orig, $qry_done, $srcs) = @_;
+       my ($self, $lei_orig, $srcs) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+
+       pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
        $io[0] = $qry_done; # don't need stdin
        $io[1]->autoflush(1);
        $io[2]->autoflush(1);
@@ -160,9 +171,20 @@ sub do_query {
        for my $rmt (@{$self->{remotes} // []}) {
                $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
        }
-
-       # sent off to children, they will drop remaining references to it
-       close $qry_done;
+       @io = ();
+       close $qry_done; # fully closed when children are done
+
+       # query_done will run when query_*mset close $qry_done
+       if ($lei_orig->{sock}) { # watch for client premature exit
+               require PublicInbox::EOFpipe;
+               PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
+               $lei_orig->{lxs} = $self;
+               $lei_orig->event_step_init;
+       } else {
+               $self->wq_close;
+               read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+               query_done($lei_orig); # may SIGPIPE
+       }
 }
 
 sub ipc_atfork_child {
index 5c32ab88d07ff7ce39d3b5b5cba7f59467d09e34..9610a876b489e7b55726ad5d0f1ea253edec67f0 100755 (executable)
@@ -3,32 +3,47 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use strict;
 use v5.10.1;
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EINTR ECONNRESET);
 use PublicInbox::CmdIPC4;
 my $narg = 4;
+my ($sock, $pwd);
 my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4');
 my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
        require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
-       $narg = 4;
        $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
        PublicInbox::Spawn->can('send_cmd4');
 };
 
+sub sigchld {
+       my ($sig) = @_;
+       my $flags = $sig ? POSIX::WNOHANG() : 0;
+       while (waitpid(-1, $flags) > 0) {}
+}
+
 sub exec_cmd {
        my ($fds, $argc, @argv) = @_;
-       my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
-       my @m = (*STDIN{IO}, '<&=',  *STDOUT{IO}, '>&=',
-               *STDERR{IO}, '>&=');
+       my @m = (*STDIN{IO}, '<&=',  *STDOUT{IO}, '>&=', *STDERR{IO}, '>&=');
+       my @rdr;
        for my $fd (@$fds) {
                my ($old_io, $mode) = splice(@m, 0, 2);
-               open($old_io, $mode, $fd) or die "open $mode$fd: $!";
+               open(my $tmpfh, $mode, $fd) or die "open $mode$fd: $!";
+               push @rdr, $old_io, $mode, $tmpfh;
+       }
+       require POSIX; # WNOHANG
+       $SIG{CHLD} = \&sigchld;
+       my $pid = fork // die "fork: $!";
+       if ($pid == 0) {
+               my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
+               while (my ($old_io, $mode, $tmpfh) = splice(@rdr, 0, 3)) {
+                       open $old_io, $mode, $tmpfh or die "open $mode: $!";
+               }
+               %ENV = (%ENV, %env);
+               exec(@argv);
+               die "exec: @argv: $!";
        }
-       %ENV = (%ENV, %env);
-       exec(@argv);
-       die "exec: @argv: $!";
 }
 
-my ($sock, $pwd);
 if ($send_cmd && eval {
        my $path = do {
                my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei';
@@ -40,10 +55,10 @@ if ($send_cmd && eval {
                        require File::Path;
                        File::Path::mkpath($runtime_dir, 0, 0700);
                }
-               "$runtime_dir/$narg.sock";
+               "$runtime_dir/$narg.seq.sock";
        };
        my $addr = pack_sockaddr_un($path);
-       socket($sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+       socket($sock, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
        unless (connect($sock, $addr)) { # start the daemon if not started
                local $ENV{PERL5LIB} = join(':', @INC);
                open(my $daemon, '-|', $^X, qw[-MPublicInbox::LEI
@@ -73,22 +88,41 @@ Falling back to (slow) one-shot mode
        }
        1;
 }) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available:
-       local $ENV{PWD} = $pwd;
+       $ENV{PWD} = $pwd;
        my $buf = join("\0", scalar(@ARGV), @ARGV);
        while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
        $buf .= "\0\0";
-       select $sock;
-       $| = 1; # unbuffer selected $sock
-       $send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
-       while (my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33)) {
-               if ($buf =~ /\Aexit=([0-9]+)\n\z/) {
-                       exit($1);
-               } elsif ($buf =~ /\Aexec (.+)\n\z/) {
+       $send_cmd->($sock, [ 0, 1, 2 ], $buf, MSG_EOR);
+       $SIG{TERM} = $SIG{INT} = $SIG{QUIT} = sub {
+               my ($sig) = @_; # 'TERM', not an integer :<
+               $SIG{$sig} = 'DEFAULT';
+               kill($sig, $$); # exit($signo + 128)
+       };
+       my $x_it_code = 0;
+       while (1) {
+               my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33);
+               if (scalar(@fds) == 1 && !defined($fds[0])) {
+                       last if $! == ECONNRESET;
+                       next if $! == EINTR;
+                       die "recvmsg: $!";
+               }
+               last if $buf eq '';
+               if ($buf =~ /\Ax_it ([0-9]+)\z/) {
+                       $x_it_code = $1 + 0;
+                       last;
+               } elsif ($buf =~ /\Aexec (.+)\z/) {
                        exec_cmd(\@fds, split(/\0/, $1));
                } else {
+                       sigchld();
                        die $buf;
                }
        }
+       sigchld();
+       if (my $sig = ($x_it_code & 127)) {
+               kill $sig, $$;
+               sleep;
+       }
+       exit($x_it_code >> 8);
 } else { # for systems lacking Socket::MsgHdr or Inline::C
        warn $@ if $@;
        require PublicInbox::LEI;
diff --git a/t/lei.t b/t/lei.t
index 6819f1829a24a67bf9e628ceef9ce6d445bbecdb..3ebaade6d0b1e534db94b3113d56d0267fe7226f 100644 (file)
--- a/t/lei.t
+++ b/t/lei.t
@@ -215,7 +215,7 @@ SKIP: { # real socket
        skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr;
 
        local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
-       my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";
+       my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.seq.sock";
 
        ok($lei->('daemon-pid'), 'daemon-pid');
        is($err, '', 'no error from daemon-pid');
diff --git a/xt/lei-sigpipe.t b/xt/lei-sigpipe.t
new file mode 100644 (file)
index 0000000..4d35bbb
--- /dev/null
@@ -0,0 +1,32 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use POSIX qw(WTERMSIG WIFSIGNALED SIGPIPE);
+require_mods(qw(json DBD::SQLite Search::Xapian));
+# XXX this needs an already configured lei instance with many messages
+
+my $do_test = sub {
+       my $env = shift // {};
+       pipe(my ($r, $w)) or BAIL_OUT $!;
+       open my $err, '+>', undef or BAIL_OUT $!;
+       my $opt = { run_mode => 0, 1 => $w, 2 => $err };
+       my $tp = start_script([qw(lei q -t), 'bytes:1..'], $env, $opt);
+       close $w;
+       sysread($r, my $buf, 1);
+       close $r; # trigger SIGPIPE
+       $tp->join;
+       ok(WIFSIGNALED($?), 'signaled');
+       is(WTERMSIG($?), SIGPIPE, 'got SIGPIPE');
+       seek($err, 0, 0);
+       my @err = grep(!m{mkdir /dev/null\b}, <$err>);
+       is_deeply(\@err, [], 'no errors');
+};
+
+$do_test->();
+$do_test->({XDG_RUNTIME_DIR => '/dev/null'});
+
+done_testing;