pipe(my ($r_req, $w_req)) or die "pipe: $!";
pipe(my ($r_res, $w_res)) or die "pipe: $!";
my $sigset = $oldset // PublicInbox::DS::block_signals();
- my $parent = $$;
$self->ipc_atfork_prepare;
defined(my $pid = fork) or die "fork: $!";
if ($pid == 0) {
eval { PublicInbox::DS->Reset };
+ delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
$w_req = $r_res = undef;
$w_res->autoflush(1);
$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
sub ipc_worker_reap { # dwaitpid callback
my ($self, $pid) = @_;
- warn "PID:$pid died with \$?=$?\n" if $?;
+ # SIGTERM (15) is our default exit signal
+ warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
}
# for base class, override in sub classes
$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
}
-sub _close_recvd ($) {
- my ($self) = @_;
- my $x = $self->{-wq_recv_modes};
- my $end = $x ? $#$x : 2;
- close($_) for (grep { defined } (delete @$self{0..$end}));
-}
-
sub wq_worker_loop ($) {
my ($self) = @_;
- my $buf;
my $len = $self->{wq_req_len} // (4096 * 33);
- my ($sub, $args);
my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
- local $SIG{PIPE} = sub {
- my $cur_sub = $sub;
- _close_recvd($self);
- die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
- };
while (1) {
- my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
- my $i = 0;
+ my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
+ my $nfd = 0;
for my $fd (@fds) {
my $mode = shift(@m);
if (open(my $cmdfh, $mode, $fd)) {
- $self->{$i++} = $cmdfh;
+ $self->{$nfd++} = $cmdfh;
$cmdfh->autoflush(1);
} else {
- die "$$ open($mode$fd) (FD:$i): $!";
+ die "$$ open($mode$fd) (FD:$nfd): $!";
}
}
# Sereal dies on truncated data, Storable returns undef
- $args = thaw($buf) //
+ my $args = thaw($buf) //
die "thaw error on buffer of size:".length($buf);
- eval {
- $sub = shift @$args;
- eval { $self->$sub(@$args) };
- undef $sub; # quiet SIG{PIPE} handler
- die $@ if $@;
- };
+ my $sub = shift @$args;
+ eval { $self->$sub(@$args) };
warn "$$ wq_worker: $@" if $@ &&
ref($@) ne 'PublicInbox::SIGPIPE';
- # need to close explicitly to avoid warnings after SIGPIPE
- _close_recvd($self);
+ delete @$self{0..($nfd-1)};
}
}
my $pid = fork // die "fork: $!";
if ($pid == 0) {
eval { PublicInbox::DS->Reset };
- close(delete $self->{-wq_s1});
- delete $self->{qw(-wq_workers -wq_ppid)};
+ delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
$SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN));
$SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD));
local $0 = $self->{-wq_ident};
my ($self, $ident, $nr_workers, $oldset) = @_;
($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
return if $self->{-wq_s1}; # idempotent
- my ($s1, $s2);
- socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
+ $self->{-wq_s1} = $self->{-wq_s2} = undef;
+ socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
+ die "socketpair: $!";
$self->ipc_atfork_prepare;
$nr_workers //= 4;
$nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
my $sigset = $oldset // PublicInbox::DS::block_signals();
$self->{-wq_workers} = {};
$self->{-wq_ident} = $ident;
- $self->{-wq_s1} = $s1;
- $self->{-wq_s2} = $s2;
_wq_worker_start($self, $sigset) for (1..$nr_workers);
PublicInbox::DS::sig_setmask($sigset) unless $oldset;
$self->{-wq_ppid} = $$;
my $ppid = delete $self->{-wq_ppid} or return;
my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
return if $ppid != $$; # can't reap siblings or parents
+ return (keys %$workers) if wantarray; # caller will reap
for my $pid (keys %$workers) {
dwaitpid($pid, \&ipc_worker_reap, $self);
}
}
+sub wq_kill {
+ my ($self, $sig) = @_;
+ my $workers = $self->{-wq_workers} or return;
+ kill($sig // 'TERM', keys %$workers);
+}
+
sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
sub DESTROY {
- wq_close($_[0]);
- ipc_worker_stop($_[0]);
+ my ($self) = @_;
+ my $ppid = $self->{-wq_ppid};
+ wq_kill($self) if $ppid && $ppid == $$;
+ wq_close($self);
+ ipc_worker_stop($self);
}
+# Sereal doesn't have dclone
+sub deep_clone { thaw(freeze($_[-1])) }
+
1;