-sub wq_worker_loop ($) {
- my ($self) = @_;
- my $len = $self->{wq_req_len} // (4096 * 33);
- my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
- while (1) {
- my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
- my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
- my $nfd = 0;
- for my $fd (@fds) {
- my $mode = shift(@m);
- if (open(my $cmdfh, $mode, $fd)) {
- $self->{$nfd++} = $cmdfh;
- $cmdfh->autoflush(1);
- } else {
- die "$$ open($mode$fd) (FD:$nfd): $!";
- }
+sub recv_and_run {
+ my ($self, $s2, $len, $full_stream) = @_;
+ my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
+ return if scalar(@fds) && !defined($fds[0]);
+ my $n = length($buf) or return 0;
+ my $nfd = 0;
+ for my $fd (@fds) {
+ if (open(my $cmdfh, '+<&=', $fd)) {
+ $self->{$nfd++} = $cmdfh;
+ $cmdfh->autoflush(1);
+ } else {
+ die "$$ open(+<&=$fd) (FD:$nfd): $!";
+ }
+ }
+ while ($full_stream && $n < $len) {
+ my $r = sysread($s2, $buf, $len - $n, $n) // croak "read: $!";
+ croak "read EOF after $n/$len bytes" if $r == 0;
+ $n = length($buf);
+ }
+ # Sereal dies on truncated data, Storable returns undef
+ my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n";
+ undef $buf;
+ my $sub = shift @$args;
+ eval { $self->$sub(@$args) };
+ warn "$$ $0 wq_worker: $sub: $@" if $@;
+ delete @$self{0..($nfd-1)};
+ $n;
+}
+
+sub wq_worker_loop ($$) {
+ my ($self, $bcast2) = @_;
+ my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2});
+ PublicInbox::WQWorker->new($self, $bcast2) if $bcast2;
+ PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+ PublicInbox::DS::event_loop();
+ PublicInbox::DS->Reset;
+}
+
+sub do_sock_stream { # via wq_io_do, for big requests
+ my ($self, $len) = @_;
+ recv_and_run($self, my $s2 = delete $self->{0}, $len, 1);
+}
+
+sub wq_broadcast {
+ my ($self, $sub, @args) = @_;
+ if (my $wkr = $self->{-wq_workers}) {
+ my $buf = ipc_freeze([$sub, @args]);
+ for my $bcast1 (values %$wkr) {
+ my $sock = $bcast1 // $self->{-wq_s1} // next;
+ send($sock, $buf, MSG_EOR) // croak "send: $!";
+ # XXX shouldn't have to deal with EMSGSIZE here...