]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/IPC.pm
update copyrights for 2021
[public-inbox.git] / lib / PublicInbox / IPC.pm
index 0baa218c016fdd1271aeddd4bd0a7b353232d9c6..288a8c94a0e45086beafb7eed95f9836aee3a10f 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # base class for remote IPC calls, requires Storable
@@ -56,8 +56,6 @@ sub ipc_return ($$$) {
 
 sub ipc_worker_loop ($$) {
        my ($self, $s2) = @_;
-       $self->ipc_atfork_child if $self->can('ipc_atfork_child');
-       $s2->autoflush(1);
        while (my $rec = _get_rec($s2)) {
                my ($wantarray, $sub, @args) = @$rec;
                if (!defined($wantarray)) { # no waiting if client doesn't care
@@ -73,7 +71,7 @@ sub ipc_worker_loop ($$) {
        }
 }
 
-sub ipc_worker_spawn ($$$) {
+sub ipc_worker_spawn {
        my ($self, $ident, $oldset) = @_;
        return unless $enc;
        my $pid = $self->{-ipc_worker_pid};
@@ -82,43 +80,62 @@ sub ipc_worker_spawn ($$$) {
        my ($s1, $s2);
        socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
        my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+       my $parent = $$;
+       $self->ipc_atfork_parent;
        defined($pid = fork) or die "fork: $!";
        if ($pid == 0) {
-               undef $s1;
-               local $0 = $ident;
+               eval { PublicInbox::DS->Reset };
+               $self->{-ipc_parent_pid} = $parent;
+               close $s1 or die "close(\$s1): $!";
+               $s2->autoflush(1);
                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+               local $0 = $ident;
                PublicInbox::Sigfd::sig_setmask($oldset);
+               $self->ipc_atfork_child;
                eval { ipc_worker_loop($self, $s2) };
-               die "worker $ident died: $@\n" if $@;
-               $self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit');
+               die "worker $ident PID:$$ died: $@\n" if $@;
                exit;
        }
        PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+       close $s2 or die "close(\$s2): $!";
        $s1->autoflush(1);
        $self->{-ipc_sock} = $s1;
        $self->{-ipc_worker_pid} = $pid;
 }
 
-sub ipc_reap_worker { # dwaitpid callback
+sub ipc_worker_reap { # dwaitpid callback
        my ($self, $pid) = @_;
        warn "PID:$pid died with \$?=$?\n" if $?;
 }
 
+# for base class, override in superclasses
+sub ipc_atfork_parent {}
+sub ipc_atfork_child {}
+
+sub ipc_worker_exit {
+       my (undef, $code) = @_;
+       exit($code);
+}
+
 sub ipc_worker_stop {
        my ($self) = @_;
        my $pid;
-       if (delete $self->{-ipc_sock}) {
-               $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
-       } else {
+       my $s1 = delete $self->{-ipc_sock} or do {
                $pid = delete $self->{-ipc_worker_pid} and
-                       die "unexpected PID:$pid";
-       }
-       return unless $pid;
-       eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) };
+                       die "unexpected PID:$pid without ipc_sock";
+               return;
+       };
+       $pid = delete $self->{-ipc_worker_pid} or die "no PID?";
+       _send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]);
+       shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid";
+       eval {
+               my $reap = $self->can('ipc_worker_reap');
+               PublicInbox::DS::dwaitpid($pid, $reap, $self);
+       };
        if ($@) {
                my $wp = waitpid($pid, 0);
                $pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
-               ipc_reap_worker($self, $pid);
+               $self->ipc_worker_reap($pid);
        }
 }