-sub _wq_worker_start ($$$$) {
- my ($self, $oldset, $fields, $one) = @_;
+sub prepare_nonblock {
+ ($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0);
+ require PublicInbox::WQBlocked;
+}
+
+sub wq_nonblock_do { # always async
+ my ($self, $sub, @args) = @_;
+ my $buf = ipc_freeze([$sub, @args]);
+ if ($self->{wqb}) { # saturated once, assume saturated forever
+ $self->{wqb}->flush_send($buf);
+ } else {
+ $send_cmd->($self->{-wq_s1}, [], $buf, MSG_EOR) //
+ ($!{EAGAIN} ? PublicInbox::WQBlocked->new($self, $buf)
+ : croak("sendmsg: $!"));
+ }
+}
+
+sub _wq_worker_start {
+ my ($self, $oldset, $fields, $one, @cb_args) = @_;