-sub wq_do { # always async
- my ($self, $sub, $in, $out, $err, @args) = @_;
- if (my $s1 = $self->{-wq_seq}) { # run in worker
- $_ = fileno($_) for ($in, $out, $err);
- $send_cmd->($s1, $in, $out, $err,
- freeze([$sub, @args]), MSG_EOR);
+sub do_sock_stream { # via wq_io_do, for big requests
+ my ($self, $len) = @_;
+ recv_and_run($self, delete $self->{0}, $len, 1);
+}
+
+sub wq_io_do { # always async
+ my ($self, $sub, $ios, @args) = @_;
+ if (my $s1 = $self->{-wq_s1}) { # run in worker
+ my $fds = [ map { fileno($_) } @$ios ];
+ my $buf = ipc_freeze([$sub, @args]);
+ my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
+ return if defined($n); # likely
+ croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
+ croak "sendmsg: $!" if !$!{EMSGSIZE};
+ socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
+ croak "socketpair: $!";
+ $n = $send_cmd->($s1, [ fileno($r) ],
+ ipc_freeze(['do_sock_stream', length($buf)]),
+ MSG_EOR) // croak "sendmsg: $!";
+ undef $r;
+ $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
+ while ($n < length($buf)) {
+ my $x = syswrite($w, $buf, length($buf) - $n, $n) //
+ croak "syswrite: $!";
+ croak "syswrite wrote 0 bytes" if $x == 0;
+ $n += $x;
+ }