X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;ds=sidebyside;f=lib%2FPublicInbox%2FIPC.pm;h=fbc91f6f96ea013252721c2c3127f70a6213d979;hb=39d44555e3f04c97e98c7f5d3538bbba6a19656b;hp=c54fcc64481966fd933a63ed81a04e09d836a652;hpb=7dd5b28cb9bdcfa262ddad47d7f033f600675dc3;p=public-inbox.git diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index c54fcc64..fbc91f6f 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -130,7 +130,8 @@ sub ipc_worker_spawn { sub ipc_worker_reap { # dwaitpid callback my ($self, $pid) = @_; - warn "PID:$pid died with \$?=$?\n" if $?; + # SIGTERM (15) is our default exit signal + warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15; } # for base class, override in sub classes @@ -236,50 +237,31 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } -sub _close_recvd ($) { - my ($self) = @_; - my $x = $self->{-wq_recv_modes}; - my $end = $x ? $#$x : 2; - close($_) for (grep { defined } (delete @$self{0..$end})); -} - sub wq_worker_loop ($) { my ($self) = @_; - my $buf; my $len = $self->{wq_req_len} // (4096 * 33); - my ($sub, $args); my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; - local $SIG{PIPE} = sub { - my $cur_sub = $sub; - _close_recvd($self); - die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub; - }; while (1) { - my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF - my $i = 0; + my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]}; + my $nfd = 0; for my $fd (@fds) { my $mode = shift(@m); if (open(my $cmdfh, $mode, $fd)) { - $self->{$i++} = $cmdfh; + $self->{$nfd++} = $cmdfh; $cmdfh->autoflush(1); } else { - die "$$ open($mode$fd) (FD:$i): $!"; + die "$$ open($mode$fd) (FD:$nfd): $!"; } } # Sereal dies on truncated data, Storable returns undef - $args = thaw($buf) // + my $args = thaw($buf) // die "thaw error on buffer of size:".length($buf); - eval { - $sub = shift @$args; - eval { $self->$sub(@$args) }; - undef $sub; # quiet SIG{PIPE} handler - die $@ if $@; - }; + my $sub = shift @$args; + eval { $self->$sub(@$args) }; warn "$$ wq_worker: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; - # need to close explicitly to avoid warnings after SIGPIPE - _close_recvd($self); + delete @$self{0..($nfd-1)}; } } @@ -400,9 +382,16 @@ sub wq_close { } } +sub wq_kill { + my ($self, $sig) = @_; + my $workers = $self->{-wq_workers} or return; + kill($sig // 'TERM', keys %$workers); +} + sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS } sub DESTROY { + wq_kill($_[0]); wq_close($_[0]); ipc_worker_stop($_[0]); }