my $len = $self->{wq_req_len} // (4096 * 33);
my ($rec, $sub, @args);
my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
+ local $SIG{PIPE} = sub {
+ die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
+ };
until ($self->{-wq_quit}) {
my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
my $i = 0;
my $mode = shift(@m);
if (open(my $fh, $mode, $fd)) {
$self->{$i++} = $fh;
+ $fh->autoflush(1);
} else {
die "$$ open($mode$fd) (FD:$i): $!";
}
die "thaw error on buffer of size:".length($buf);
($sub, @args) = @$rec;
eval { $self->$sub(@args) };
- warn "$$ wq_worker: $@" if $@;
- delete @$self{0, 1, 2};
+ warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+ undef $sub; # quiet SIG{PIPE} handler
+ # need to close explicitly to avoid warnings after SIGPIPE
+ close($_) for (delete(@$self{0..2}));
}
}
PublicInbox::DS::sig_setmask($oldset);
my $on_destroy = $self->ipc_atfork_child;
eval { wq_worker_loop($self) };
- die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@;
- exit;
+ warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
+ exit($@ ? 1 : 0);
} else {
$self->{-wq_workers}->{$pid} = \undef;
}