]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/IPC.pm
ipc: add wq_broadcast
[public-inbox.git] / lib / PublicInbox / IPC.pm
index efac4c4dd9a145372a96d2de0e37d6b7ff2e1252..2aeb64623cb0e6dd80895f1c36c7a50ebdce6939 100644 (file)
@@ -240,8 +240,9 @@ sub recv_and_run {
 }
 
 sub wq_worker_loop ($) {
-       my ($self) = @_;
+       my ($self, $bcast_a) = @_;
        my $wqw = PublicInbox::WQWorker->new($self);
+       PublicInbox::WQWorker->new($self, '-wq_bcast2');
        PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
        PublicInbox::DS->EventLoop;
        PublicInbox::DS->Reset;
@@ -252,6 +253,20 @@ sub do_sock_stream { # via wq_io_do, for big requests
        recv_and_run($self, delete $self->{0}, $len, 1);
 }
 
+sub wq_broadcast {
+       my ($self, $sub, @args) = @_;
+       if (my $wkr = $self->{-wq_workers}) {
+               for my $bcast1 (values %$wkr) {
+                       my $buf = ipc_freeze([$sub, @args]);
+                       send($bcast1, $buf, MSG_EOR) // croak "send: $!";
+                       # XXX shouldn't have to deal with EMSGSIZE here...
+               }
+       } else {
+               eval { $self->$sub(@args) };
+               warn "wq_broadcast: $@" if $@;
+       }
+}
+
 sub wq_io_do { # always async
        my ($self, $sub, $ios, @args) = @_;
        if (my $s1 = $self->{-wq_s1}) { # run in worker
@@ -284,15 +299,21 @@ sub wq_io_do { # always async
 
 sub _wq_worker_start ($$$) {
        my ($self, $oldset, $fields) = @_;
+       my ($bcast1, $bcast2);
+       socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
+                                               die "socketpair: $!";
        my $seed = rand(0xffffffff);
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
                srand($seed);
+               undef $bcast1;
                eval { PublicInbox::DS->Reset };
-               delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
+               delete @$self{qw(-wq_s1 -wq_ppid)};
+               $self->{-wq_worker_nr} =
+                               keys %{delete($self->{-wq_workers}) // {}};
                $SIG{$_} = 'IGNORE' for (qw(PIPE));
                $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
-               local $0 = $self->{-wq_ident};
+               local $0 = "$self->{-wq_ident} $self->{-wq_worker_nr}";
                # ensure we properly exit even if warn() dies:
                my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
                eval {
@@ -301,12 +322,13 @@ sub _wq_worker_start ($$$) {
                        my $on_destroy = $self->ipc_atfork_child;
                        local %SIG = %SIG;
                        PublicInbox::DS::sig_setmask($oldset);
+                       $self->{-wq_bcast2} = $bcast2;
                        wq_worker_loop($self);
                };
                warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
                undef $end; # trigger exit
        } else {
-               $self->{-wq_workers}->{$pid} = \undef;
+               $self->{-wq_workers}->{$pid} = $bcast1;
        }
 }