]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/IPC.pm
lei: test SIGPIPE, stop xsearch workers on client abort
[public-inbox.git] / lib / PublicInbox / IPC.pm
index c54fcc64481966fd933a63ed81a04e09d836a652..fbc91f6f96ea013252721c2c3127f70a6213d979 100644 (file)
@@ -130,7 +130,8 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
        my ($self, $pid) = @_;
-       warn "PID:$pid died with \$?=$?\n" if $?;
+       # SIGTERM (15) is our default exit signal
+       warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
 }
 
 # for base class, override in sub classes
@@ -236,50 +237,31 @@ sub ipc_sibling_atfork_child {
        $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _close_recvd ($) {
-       my ($self) = @_;
-       my $x = $self->{-wq_recv_modes};
-       my $end = $x ? $#$x : 2;
-       close($_) for (grep { defined } (delete @$self{0..$end}));
-}
-
 sub wq_worker_loop ($) {
        my ($self) = @_;
-       my $buf;
        my $len = $self->{wq_req_len} // (4096 * 33);
-       my ($sub, $args);
        my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-       local $SIG{PIPE} = sub {
-               my $cur_sub = $sub;
-               _close_recvd($self);
-               die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
-       };
        while (1) {
-               my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
-               my $i = 0;
+               my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
                my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
+               my $nfd = 0;
                for my $fd (@fds) {
                        my $mode = shift(@m);
                        if (open(my $cmdfh, $mode, $fd)) {
-                               $self->{$i++} = $cmdfh;
+                               $self->{$nfd++} = $cmdfh;
                                $cmdfh->autoflush(1);
                        } else {
-                               die "$$ open($mode$fd) (FD:$i): $!";
+                               die "$$ open($mode$fd) (FD:$nfd): $!";
                        }
                }
                # Sereal dies on truncated data, Storable returns undef
-               $args = thaw($buf) //
+               my $args = thaw($buf) //
                        die "thaw error on buffer of size:".length($buf);
-               eval {
-                       $sub = shift @$args;
-                       eval { $self->$sub(@$args) };
-                       undef $sub; # quiet SIG{PIPE} handler
-                       die $@ if $@;
-               };
+               my $sub = shift @$args;
+               eval { $self->$sub(@$args) };
                warn "$$ wq_worker: $@" if $@ &&
                                        ref($@) ne 'PublicInbox::SIGPIPE';
-               # need to close explicitly to avoid warnings after SIGPIPE
-               _close_recvd($self);
+               delete @$self{0..($nfd-1)};
        }
 }
 
@@ -400,9 +382,16 @@ sub wq_close {
        }
 }
 
+sub wq_kill {
+       my ($self, $sig) = @_;
+       my $workers = $self->{-wq_workers} or return;
+       kill($sig // 'TERM', keys %$workers);
+}
+
 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
 
 sub DESTROY {
+       wq_kill($_[0]);
        wq_close($_[0]);
        ipc_worker_stop($_[0]);
 }