+sub wq_sync_run {
+ my ($self, $wantarray, $sub, @args) = @_;
+ if ($wantarray) {
+ my @ret = eval { $self->$sub(@args) };
+ ipc_return($self->{0}, \@ret, $@);
+ } else { # '' => wantscalar
+ my $ret = eval { $self->$sub(@args) };
+ ipc_return($self->{0}, \$ret, $@);
+ }
+}
+
+sub wq_do {
+ my ($self, $sub, @args) = @_;
+ if (defined(wantarray)) {
+ pipe(my ($r, $w)) or die "pipe: $!";
+ wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args);
+ undef $w;
+ _wait_return($r, $sub);
+ } else {
+ wq_io_do($self, $sub, [], @args);
+ }
+}
+
+sub prepare_nonblock {
+ ($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0);
+ require PublicInbox::WQBlocked;
+}
+
+sub wq_nonblock_do { # always async
+ my ($self, $sub, @args) = @_;
+ my $buf = ipc_freeze([$sub, @args]);
+ if ($self->{wqb}) { # saturated once, assume saturated forever
+ $self->{wqb}->flush_send($buf);
+ } else {
+ $send_cmd->($self->{-wq_s1}, [], $buf, MSG_EOR) //
+ ($!{EAGAIN} ? PublicInbox::WQBlocked->new($self, $buf)
+ : croak("sendmsg: $!"));
+ }
+}
+
+sub _wq_worker_start {
+ my ($self, $oldset, $fields, $one, @cb_args) = @_;