-sub wq_worker_incr { # SIGTTIN handler
- my ($self, $oldset, $fields) = @_;
- $self->{-wq_s2} or return;
- return if wq_workers($self) >= $WQ_MAX_WORKERS;
- $self->ipc_atfork_prepare;
- my $sigset = $oldset // PublicInbox::DS::block_signals();
- _wq_worker_start($self, $sigset, $fields);
- PublicInbox::DS::sig_setmask($sigset) unless $oldset;
-}
-
-sub wq_exit { # wakes up wq_worker_decr_wait
- send($_[0]->{-wq_s2}, $$, MSG_EOR) // die "$$ send: $!";
- exit;
-}
-
-sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
- my ($self) = @_;
- return unless wq_workers($self);
- my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
- $self->wq_do('wq_exit', [ $s2, $s2, $s2 ]);
- # caller must call wq_worker_decr_wait in main loop
-}
-
-sub wq_worker_decr_wait {
- my ($self, $timeout) = @_;
- return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
- my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
- vec(my $rin = '', fileno($s1), 1) = 1;
- select(my $rout = $rin, undef, undef, $timeout) or
- croak 'timed out waiting for wq_exit';
- recv($s1, my $pid, 64, 0) // croak "recv: $!";
- my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers';
- delete $workers->{$pid} // croak "BUG: PID:$pid invalid";
- dwaitpid($pid, \&ipc_worker_reap, $self);
-}
-
-# set or retrieve number of workers
-sub wq_workers {
- my ($self, $nr) = @_;
- my $cur = $self->{-wq_workers} or return;
- if (defined $nr) {
- while (scalar(keys(%$cur)) > $nr) {
- $self->wq_worker_decr;
- $self->wq_worker_decr_wait;
- }
- $self->wq_worker_incr while scalar(keys(%$cur)) < $nr;
- }
- scalar(keys(%$cur));
-}
-