]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/IPC.pm
www_stream: add trailing slash for help and color links
[public-inbox.git] / lib / PublicInbox / IPC.pm
index c8673e26832fd960601903aa532e8b5ac290cc46..1fa67d00e704baff9b769a12ab25487a9cf1d774 100644 (file)
@@ -109,7 +109,6 @@ sub ipc_worker_spawn {
                $w_res->autoflush(1);
                $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
                local $0 = $ident;
-               PublicInbox::DS::sig_setmask($sigset);
                # ensure we properly exit even if warn() dies:
                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                eval {
@@ -117,6 +116,7 @@ sub ipc_worker_spawn {
                        local @$self{keys %$fields} = values(%$fields);
                        my $on_destroy = $self->ipc_atfork_child;
                        local %SIG = %SIG;
+                       PublicInbox::DS::sig_setmask($sigset);
                        ipc_worker_loop($self, $r_req, $w_res);
                };
                warn "worker $ident PID:$$ died: $@\n" if $@;
@@ -240,8 +240,9 @@ sub recv_and_run {
 }
 
 sub wq_worker_loop ($) {
-       my ($self) = @_;
+       my ($self, $bcast_a) = @_;
        my $wqw = PublicInbox::WQWorker->new($self);
+       PublicInbox::WQWorker->new($self, '-wq_bcast2');
        PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
        PublicInbox::DS->EventLoop;
        PublicInbox::DS->Reset;
@@ -252,6 +253,20 @@ sub do_sock_stream { # via wq_io_do, for big requests
        recv_and_run($self, delete $self->{0}, $len, 1);
 }
 
+sub wq_broadcast {
+       my ($self, $sub, @args) = @_;
+       if (my $wkr = $self->{-wq_workers}) {
+               for my $bcast1 (values %$wkr) {
+                       my $buf = ipc_freeze([$sub, @args]);
+                       send($bcast1, $buf, MSG_EOR) // croak "send: $!";
+                       # XXX shouldn't have to deal with EMSGSIZE here...
+               }
+       } else {
+               eval { $self->$sub(@args) };
+               warn "wq_broadcast: $@" if $@;
+       }
+}
+
 sub wq_io_do { # always async
        my ($self, $sub, $ios, @args) = @_;
        if (my $s1 = $self->{-wq_s1}) { # run in worker
@@ -284,16 +299,21 @@ sub wq_io_do { # always async
 
 sub _wq_worker_start ($$$) {
        my ($self, $oldset, $fields) = @_;
+       my ($bcast1, $bcast2);
+       socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
+                                               die "socketpair: $!";
        my $seed = rand(0xffffffff);
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
                srand($seed);
+               undef $bcast1;
                eval { PublicInbox::DS->Reset };
-               delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
+               delete @$self{qw(-wq_s1 -wq_ppid)};
+               $self->{-wq_worker_nr} =
+                               keys %{delete($self->{-wq_workers}) // {}};
                $SIG{$_} = 'IGNORE' for (qw(PIPE));
                $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
-               local $0 = $self->{-wq_ident};
-               PublicInbox::DS::sig_setmask($oldset);
+               local $0 = "$self->{-wq_ident} $self->{-wq_worker_nr}";
                # ensure we properly exit even if warn() dies:
                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                eval {
@@ -301,12 +321,14 @@ sub _wq_worker_start ($$$) {
                        local @$self{keys %$fields} = values(%$fields);
                        my $on_destroy = $self->ipc_atfork_child;
                        local %SIG = %SIG;
+                       PublicInbox::DS::sig_setmask($oldset);
+                       $self->{-wq_bcast2} = $bcast2;
                        wq_worker_loop($self);
                };
                warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
                undef $end; # trigger exit
        } else {
-               $self->{-wq_workers}->{$pid} = \undef;
+               $self->{-wq_workers}->{$pid} = $bcast1;
        }
 }
 
@@ -319,7 +341,7 @@ sub wq_workers_start {
        socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
                die "socketpair: $!";
        $self->ipc_atfork_prepare;
-       $nr_workers //= 4;
+       $nr_workers //= $self->{-wq_nr_workers};
        $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
        my $sigset = $oldset // PublicInbox::DS::block_signals();
        $self->{-wq_workers} = {};
@@ -332,6 +354,7 @@ sub wq_workers_start {
 sub wq_worker_incr { # SIGTTIN handler
        my ($self, $oldset, $fields) = @_;
        $self->{-wq_s2} or return;
+       die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
        return if wq_workers($self) >= $WQ_MAX_WORKERS;
        $self->ipc_atfork_prepare;
        my $sigset = $oldset // PublicInbox::DS::block_signals();
@@ -347,6 +370,7 @@ sub wq_exit { # wakes up wq_worker_decr_wait
 sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
        my ($self) = @_;
        return unless wq_workers($self);
+       die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
        $self->wq_io_do('wq_exit');
        # caller must call wq_worker_decr_wait in main loop
 }
@@ -354,6 +378,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 sub wq_worker_decr_wait {
        my ($self, $timeout, $cb, @args) = @_;
        return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
+       die "-wq_nr_workers locked" if defined $self->{-wq_nr_workers};
        my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
        vec(my $rin = '', fileno($s1), 1) = 1;
        select(my $rout = $rin, undef, undef, $timeout) or
@@ -395,9 +420,9 @@ sub wq_close {
 }
 
 sub wq_kill_old {
-       my ($self) = @_;
+       my ($self, $sig) = @_;
        my $pids = $self->{"-wq_old_pids.$$"} or return;
-       kill 'TERM', @$pids;
+       kill($sig // 'TERM', @$pids);
 }
 
 sub wq_kill {
@@ -412,9 +437,11 @@ sub DESTROY {
        my ($self) = @_;
        my $ppid = $self->{-wq_ppid};
        wq_kill($self) if $ppid && $ppid == $$;
+       my $err = $?;
        wq_close($self);
        wq_wait_old($self);
        ipc_worker_stop($self);
+       $? = $err if $err;
 }
 
 sub detect_nproc () {