X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FIPC.pm;h=2aeb64623cb0e6dd80895f1c36c7a50ebdce6939;hb=3b2b02a411b161e2392c3a5b1c376c83573b027e;hp=efac4c4dd9a145372a96d2de0e37d6b7ff2e1252;hpb=fb8b16ff2b40ecd22ebbdea0d27069749e800077;p=public-inbox.git diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index efac4c4d..2aeb6462 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -240,8 +240,9 @@ sub recv_and_run { } sub wq_worker_loop ($) { - my ($self) = @_; + my ($self, $bcast_a) = @_; my $wqw = PublicInbox::WQWorker->new($self); + PublicInbox::WQWorker->new($self, '-wq_bcast2'); PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} }); PublicInbox::DS->EventLoop; PublicInbox::DS->Reset; @@ -252,6 +253,20 @@ sub do_sock_stream { # via wq_io_do, for big requests recv_and_run($self, delete $self->{0}, $len, 1); } +sub wq_broadcast { + my ($self, $sub, @args) = @_; + if (my $wkr = $self->{-wq_workers}) { + for my $bcast1 (values %$wkr) { + my $buf = ipc_freeze([$sub, @args]); + send($bcast1, $buf, MSG_EOR) // croak "send: $!"; + # XXX shouldn't have to deal with EMSGSIZE here... + } + } else { + eval { $self->$sub(@args) }; + warn "wq_broadcast: $@" if $@; + } +} + sub wq_io_do { # always async my ($self, $sub, $ios, @args) = @_; if (my $s1 = $self->{-wq_s1}) { # run in worker @@ -284,15 +299,21 @@ sub wq_io_do { # always async sub _wq_worker_start ($$$) { my ($self, $oldset, $fields) = @_; + my ($bcast1, $bcast2); + socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or + die "socketpair: $!"; my $seed = rand(0xffffffff); my $pid = fork // die "fork: $!"; if ($pid == 0) { srand($seed); + undef $bcast1; eval { PublicInbox::DS->Reset }; - delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)}; + delete @$self{qw(-wq_s1 -wq_ppid)}; + $self->{-wq_worker_nr} = + keys %{delete($self->{-wq_workers}) // {}}; $SIG{$_} = 'IGNORE' for (qw(PIPE)); $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD)); - local $0 = $self->{-wq_ident}; + local $0 = "$self->{-wq_ident} $self->{-wq_worker_nr}"; # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); eval { @@ -301,12 +322,13 @@ sub _wq_worker_start ($$$) { my $on_destroy = $self->ipc_atfork_child; local %SIG = %SIG; PublicInbox::DS::sig_setmask($oldset); + $self->{-wq_bcast2} = $bcast2; wq_worker_loop($self); }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; undef $end; # trigger exit } else { - $self->{-wq_workers}->{$pid} = \undef; + $self->{-wq_workers}->{$pid} = $bcast1; } }