-# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# base class for remote IPC calls, requires Storable
sub ipc_worker_loop ($$) {
my ($self, $s2) = @_;
- $self->ipc_atfork_child if $self->can('ipc_atfork_child');
- $s2->autoflush(1);
while (my $rec = _get_rec($s2)) {
my ($wantarray, $sub, @args) = @$rec;
if (!defined($wantarray)) { # no waiting if client doesn't care
}
}
-sub ipc_worker_spawn ($$$) {
+sub ipc_worker_spawn {
my ($self, $ident, $oldset) = @_;
return unless $enc;
my $pid = $self->{-ipc_worker_pid};
my ($s1, $s2);
socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+ my $parent = $$;
+ $self->ipc_atfork_parent;
defined($pid = fork) or die "fork: $!";
if ($pid == 0) {
- undef $s1;
- local $0 = $ident;
+ eval { PublicInbox::DS->Reset };
+ $self->{-ipc_parent_pid} = $parent;
+ close $s1 or die "close(\$s1): $!";
+ $s2->autoflush(1);
$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+ local $0 = $ident;
PublicInbox::Sigfd::sig_setmask($oldset);
+ $self->ipc_atfork_child;
eval { ipc_worker_loop($self, $s2) };
- die "worker $ident died: $@\n" if $@;
- $self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit');
+ die "worker $ident PID:$$ died: $@\n" if $@;
exit;
}
PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+ close $s2 or die "close(\$s2): $!";
$s1->autoflush(1);
$self->{-ipc_sock} = $s1;
$self->{-ipc_worker_pid} = $pid;
}
-sub ipc_reap_worker { # dwaitpid callback
+sub ipc_worker_reap { # dwaitpid callback
my ($self, $pid) = @_;
warn "PID:$pid died with \$?=$?\n" if $?;
}
+# for base class, override in superclasses
+sub ipc_atfork_parent {}
+sub ipc_atfork_child {}
+
+sub ipc_worker_exit {
+ my (undef, $code) = @_;
+ exit($code);
+}
+
sub ipc_worker_stop {
my ($self) = @_;
my $pid;
- if (delete $self->{-ipc_sock}) {
- $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
- } else {
+ my $s1 = delete $self->{-ipc_sock} or do {
$pid = delete $self->{-ipc_worker_pid} and
- die "unexpected PID:$pid";
- }
- return unless $pid;
- eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) };
+ die "unexpected PID:$pid without ipc_sock";
+ return;
+ };
+ $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
+ _send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]);
+ shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid";
+ eval {
+ my $reap = $self->can('ipc_worker_reap');
+ PublicInbox::DS::dwaitpid($pid, $reap, $self);
+ };
if ($@) {
my $wp = waitpid($pid, 0);
$pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
- ipc_reap_worker($self, $pid);
+ $self->ipc_worker_reap($pid);
}
}