X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FIPC.pm;h=548a72ebb11c70dba1ba1360fdc8d6d5598b9c95;hb=7b654d175cf2e31b4354929ea678563f534947e5;hp=edc5ba6486f12a888432d3828fdd4e7f354cff75;hpb=f9557e2c6b7510d278310066fe7c26f84cae6e1d;p=public-inbox.git diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index edc5ba64..548a72eb 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -96,7 +96,7 @@ sub ipc_worker_loop ($$$) { # starts a worker if Sereal or Storable is installed sub ipc_worker_spawn { - my ($self, $ident, $oldset, $fields) = @_; + my ($self, $ident, $oldset, $fields, @cb_args) = @_; return if ($self->{-ipc_ppid} // -1) == $$; # idempotent delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)}); pipe(my ($r_req, $w_req)) or die "pipe: $!"; @@ -133,28 +133,20 @@ sub ipc_worker_spawn { $self->{-ipc_req} = $w_req; $self->{-ipc_res} = $r_res; $self->{-ipc_ppid} = $$; - awaitpid($pid, \&ipc_worker_reap, $self); + awaitpid($pid, \&ipc_worker_reap, $self, @cb_args); $self->{-ipc_pid} = $pid; } sub ipc_worker_reap { # awaitpid callback - my ($pid, $self) = @_; + my ($pid, $self, $cb, @args) = @_; delete $self->{-wq_workers}->{$pid}; - if (my $cb_args = $self->{-reap_do}) { - return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]); - } + return $cb->($pid, $self, @args) if $cb; return if !$?; my $s = $? & 127; # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13 } -# register wait workers -sub awaitpid_init { - my ($self, @cb_args) = @_; - $self->{-reap_do} = \@cb_args; -} - # for base class, override in sub classes sub ipc_atfork_prepare {} @@ -347,7 +339,6 @@ sub wq_do { sub prepare_nonblock { ($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0); - $_[0]->{-reap_do} or die 'BUG: {-reap_do} needed for nonblock'; require PublicInbox::WQBlocked; } @@ -363,8 +354,8 @@ sub wq_nonblock_do { # always async } } -sub _wq_worker_start ($$$$) { - my ($self, $oldset, $fields, $one) = @_; +sub _wq_worker_start { + my ($self, $oldset, $fields, $one, @cb_args) = @_; my ($bcast1, $bcast2); $one or socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; @@ -395,13 +386,13 @@ sub _wq_worker_start ($$$$) { undef $end; # trigger exit } else { $self->{-wq_workers}->{$pid} = $bcast1; - awaitpid($pid, \&ipc_worker_reap, $self); + awaitpid($pid, \&ipc_worker_reap, $self, @cb_args); } } # starts workqueue workers if Sereal or Storable is installed sub wq_workers_start { - my ($self, $ident, $nr_workers, $oldset, $fields) = @_; + my ($self, $ident, $nr_workers, $oldset, $fields, @cb_args) = @_; ($send_cmd && $recv_cmd && defined($SEQPACKET)) or return; return if $self->{-wq_s1}; # idempotent $self->{-wq_s1} = $self->{-wq_s2} = undef; @@ -414,7 +405,9 @@ sub wq_workers_start { $self->{-wq_ident} = $ident; my $one = $nr_workers == 1; $self->{-wq_nr_workers} = $nr_workers; - _wq_worker_start($self, $sigset, $fields, $one) for (1..$nr_workers); + for (1..$nr_workers) { + _wq_worker_start($self, $sigset, $fields, $one, @cb_args); + } PublicInbox::DS::sig_setmask($sigset) unless $oldset; $self->{-wq_ppid} = $$; } @@ -422,11 +415,10 @@ sub wq_workers_start { sub wq_close { my ($self) = @_; if (my $wqb = delete $self->{wqb}) { - $self->{-reap_do} or die 'BUG: {-reap_do} unset'; $wqb->enq_close; } delete @$self{qw(-wq_s1 -wq_s2)} or return; - return if $self->{-reap_do}; + return if ($self->{-wq_ppid} // -1) != $$; awaitpid($_) for keys %{$self->{-wq_workers}}; }