if ($pid == 0) {
eval { PublicInbox::DS->Reset };
close(delete $self->{-wq_s1});
- delete $self->{qw(-wq_workers -wq_quit)};
+ delete $self->{qw(-wq_workers -wq_quit -wq_ppid)};
my $quit = sub { $self->{-wq_quit} = 1 };
$SIG{$_} = $quit for (qw(TERM INT QUIT));
$SIG{$_} = 'IGNORE' for (qw(TTOU TTIN));
dwaitpid($pid, \&ipc_worker_reap, $self);
}
+# set or retrieve number of workers
+sub wq_workers {
+ my ($self, $nr) = @_;
+ my $cur = $self->{-wq_workers} or return;
+ if (defined $nr) {
+ while (scalar(keys(%$cur)) > $nr) {
+ $self->wq_worker_decr;
+ $self->wq_worker_decr_wait;
+ }
+ $self->wq_worker_incr while scalar(keys(%$cur)) < $nr;
+ }
+ scalar(keys(%$cur));
+}
+
sub wq_close {
my ($self) = @_;
delete @$self{qw(-wq_s1 -wq_s2)} or return;
- my $ppid = delete $self->{-wq_ppid} // die 'BUG: no wq_ppid';
+ my $ppid = delete $self->{-wq_ppid} or return;
my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
return if $ppid != $$; # can't reap siblings or parents
for my $pid (keys %$workers) {
}
}
+sub DESTROY {
+ wq_close($_[0]);
+ ipc_worker_stop($_[0]);
+}
+
1;
$SIG{__WARN__} = 'DEFAULT';
is($ipc->wq_workers_start('wq', 1), $$, 'workers started again');
-is(scalar(keys %{$ipc->{-wq_workers}}), 1, '1 worker started');
+is($ipc->wq_workers, 1, '1 worker started');
$ipc->wq_worker_incr;
-is(scalar(keys %{$ipc->{-wq_workers}}), 2, 'worker count bumped');
+is($ipc->wq_workers, 2, 'worker count bumped');
$ipc->wq_worker_decr;
$ipc->wq_worker_decr_wait(10);
-is(scalar(keys %{$ipc->{-wq_workers}}), 1, 'worker count lowered');
+is($ipc->wq_workers, 1, 'worker count lowered');
+is($ipc->wq_workers(2), 2, 'worker count set');
+is($ipc->wq_workers, 2, 'worker count stayed set');
+$ipc->wq_close;
+is($ipc->wq_workers, undef, 'workers undef after close');
done_testing;