sub wq_wait_async {
my ($self, $cb, @uargs) = @_;
local $PublicInbox::DS::in_loop = 1;
- $self->{-reap_async} = 1;
$self->{-reap_do} = $cb;
my @pids = keys %{$self->{-wq_workers}};
dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
# idempotent, can be called regardless of whether worker is active or not
sub ipc_worker_stop {
- my ($self, $args) = @_;
+ my ($self) = @_;
my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
if (!$w_req && !$r_res) {
$w_req = $r_res = undef;
return if $$ != $ppid;
- dwaitpid($pid, \&ipc_worker_reap, [$self, $args]);
+ dwaitpid($pid, \&ipc_worker_reap, [$self]);
}
# use this if we have multiple readers reading curl or "pigz -dc"
sub prepare_nonblock {
($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0);
- $_[0]->{-reap_async} or die 'BUG: {-reap_async} needed for nonblock';
+ $_[0]->{-reap_do} or die 'BUG: {-reap_do} needed for nonblock';
require PublicInbox::WQBlocked;
}
sub wq_close {
my ($self) = @_;
if (my $wqb = delete $self->{wqb}) {
- $self->{-reap_async} or die 'BUG: {-reap_async} unset';
+ $self->{-reap_do} or die 'BUG: {-reap_do} unset';
$wqb->enq_close;
}
delete @$self{qw(-wq_s1 -wq_s2)} or return;
- return if $self->{-reap_async};
+ return if $self->{-reap_do};
my @pids = keys %{$self->{-wq_workers}};
dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
}