]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ipc: more consistent behavior between worker types
authorEric Wong <e@80x24.org>
Fri, 29 Jan 2021 07:42:56 +0000 (12:42 +0500)
committerEric Wong <e@80x24.org>
Sat, 30 Jan 2021 01:08:18 +0000 (01:08 +0000)
Localize signals inside the respective worker loops
in case there's circular references.

We'll also rely on OnDestroy to trigger exits from the
ipc_worker_loop like we do with wq_worker_loop.  And
also add some more developer documentation to help future
developers.

The default signals remain different, for now.
Cleanup some unnecessary "use" statements while we're
loading OnDestroy.

lib/PublicInbox/IPC.pm

index 838f95306d761eec38a0b9b7393b87947e972c2b..ece0e8b809798b5571711624875a746b2b452fc9 100644 (file)
@@ -2,16 +2,20 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # base class for remote IPC calls and workqueues, requires Storable or Sereal
+# - ipc_do and ipc_worker_* is for a single worker/producer and uses pipes
+# - wq_do and wq_worker* is for a single producer and multiple workers,
+#   using SOCK_SEQPACKET for work distribution
+# use ipc_do when you need work done on a certain process
+# use wq_do when your work can be done on any idle worker
 package PublicInbox::IPC;
 use strict;
 use v5.10.1;
 use Carp qw(confess croak);
 use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn;
-use POSIX qw(mkfifo WNOHANG);
+use PublicInbox::OnDestroy;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 use Errno qw(EMSGSIZE);
-use File::Temp 0.19 (); # 0.19 for ->newdir
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
 my $WQ_MAX_WORKERS = 4096;
@@ -107,16 +111,22 @@ sub ipc_worker_spawn {
        if ($pid == 0) {
                srand($seed);
                eval { PublicInbox::DS->Reset };
-               delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
+               delete @$self{qw(-wq_s1 -wq_s2 -wq_workers -wq_ppid)};
                $w_req = $r_res = undef;
                $w_res->autoflush(1);
                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
                local $0 = $ident;
                PublicInbox::DS::sig_setmask($sigset);
+               # ensure we properly exit even if warn() dies:
+               my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                my $on_destroy = $self->ipc_atfork_child;
-               eval { ipc_worker_loop($self, $r_req, $w_res) };
+               eval {
+                       local %SIG = %SIG;
+                       ipc_worker_loop($self, $r_req, $w_res);
+               };
                die "worker $ident PID:$$ died: $@\n" if $@;
-               exit;
+               undef $on_destroy;
+               undef $end; # trigger exit
        }
        PublicInbox::DS::sig_setmask($sigset) unless $oldset;
        $r_req = $w_res = undef;
@@ -320,14 +330,17 @@ sub _wq_worker_start ($$$) {
                eval { PublicInbox::DS->Reset };
                delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
                @$self{keys %$fields} = values(%$fields) if $fields;
-               $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN));
-               $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD));
+               $SIG{$_} = 'IGNORE' for (qw(PIPE));
+               $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
                local $0 = $self->{-wq_ident};
                PublicInbox::DS::sig_setmask($oldset);
                # ensure we properly exit even if warn() dies:
                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                my $on_destroy = $self->ipc_atfork_child;
-               eval { wq_worker_loop($self) };
+               eval {
+                       local %SIG = %SIG;
+                       wq_worker_loop($self);
+               };
                warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
                undef $on_destroy;
                undef $end; # trigger exit