+ warn "wq_io_do: $@" if $@;
+ delete @$self{0..$#$ios}; # don't close
+ }
+}
+
+sub wq_sync_run {
+ my ($self, $wantarray, $sub, @args) = @_;
+ if ($wantarray) {
+ my @ret = eval { $self->$sub(@args) };
+ ipc_return($self->{0}, \@ret, $@);
+ } else { # '' => wantscalar
+ my $ret = eval { $self->$sub(@args) };
+ ipc_return($self->{0}, \$ret, $@);
+ }
+}
+
+sub wq_do {
+ my ($self, $sub, @args) = @_;
+ if (defined(wantarray)) {
+ pipe(my ($r, $w)) or die "pipe: $!";
+ wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args);
+ undef $w;
+ _wait_return($r, $sub);
+ } else {
+ wq_io_do($self, $sub, [], @args);
+ }
+}
+
+sub prepare_nonblock {
+ ($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0);
+ $_[0]->{-reap_async} or die 'BUG: {-reap_async} needed for nonblock';
+ require PublicInbox::WQBlocked;
+}
+
+sub wq_nonblock_do { # always async
+ my ($self, $sub, @args) = @_;
+ my $buf = ipc_freeze([$sub, @args]);
+ if ($self->{wqb}) { # saturated once, assume saturated forever
+ $self->{wqb}->flush_send($buf);
+ } else {
+ $send_cmd->($self->{-wq_s1}, [], $buf, MSG_EOR) //
+ ($!{EAGAIN} ? PublicInbox::WQBlocked->new($self, $buf)
+ : croak("sendmsg: $!"));
+ }
+}
+
+sub _wq_worker_start ($$$$) {
+ my ($self, $oldset, $fields, $one) = @_;
+ my ($bcast1, $bcast2);
+ $one or socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
+ die "socketpair: $!";
+ my $seed = rand(0xffffffff);
+ my $pid = fork // die "fork: $!";
+ if ($pid == 0) {
+ srand($seed);
+ eval { Net::SSLeay::randomize() };
+ undef $bcast1;
+ eval { PublicInbox::DS->Reset };
+ delete @$self{qw(-wq_s1 -wq_ppid)};
+ $self->{-wq_worker_nr} =
+ keys %{delete($self->{-wq_workers}) // {}};
+ $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
+ local $0 = $one ? $self->{-wq_ident} :
+ "$self->{-wq_ident} $self->{-wq_worker_nr}";
+ # ensure we properly exit even if warn() dies:
+ my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
+ eval {
+ $fields //= {};
+ local @$self{keys %$fields} = values(%$fields);
+ my $on_destroy = $self->ipc_atfork_child;
+ local @SIG{keys %SIG} = values %SIG;
+ PublicInbox::DS::sig_setmask($oldset);
+ wq_worker_loop($self, $bcast2);
+ };
+ warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
+ undef $end; # trigger exit
+ } else {
+ $self->{-wq_workers}->{$pid} = $bcast1;