use PublicInbox::DS qw(dwaitpid);
use PublicInbox::Spawn;
use PublicInbox::OnDestroy;
+use PublicInbox::WQWorker;
use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
use Errno qw(EMSGSIZE);
my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
PublicInbox::DS::sig_setmask($sigset);
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
- my $on_destroy = $self->ipc_atfork_child;
eval {
+ my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
ipc_worker_loop($self, $r_req, $w_res);
};
die "worker $ident PID:$$ died: $@\n" if $@;
- undef $on_destroy;
undef $end; # trigger exit
}
PublicInbox::DS::sig_setmask($sigset) unless $oldset;
# for base class, override in sub classes
sub ipc_atfork_prepare {}
+sub wq_atexit_child {}
+
sub ipc_atfork_child {
my ($self) = @_;
my $io = delete($self->{-ipc_atfork_child_close}) or return;
$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
}
-sub _recv_and_run {
+sub recv_and_run {
my ($self, $s2, $len, $full_stream) = @_;
my @fds = $recv_cmd->($s2, my $buf, $len);
- my $n = length($buf // '') or return;
+ return if scalar(@fds) && !defined($fds[0]);
+ my $n = length($buf) or return 0;
my $nfd = 0;
for my $fd (@fds) {
if (open(my $cmdfh, '+<&=', $fd)) {
sub wq_worker_loop ($) {
my ($self) = @_;
- my $len = $self->{wq_req_len} // (4096 * 33);
- my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
- 1 while (_recv_and_run($self, $s2, $len));
+ my $wqw = PublicInbox::WQWorker->new($self);
+ PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+ PublicInbox::DS->EventLoop;
+ PublicInbox::DS->Reset;
}
sub do_sock_stream { # via wq_do, for big requests
my ($self, $len) = @_;
- _recv_and_run($self, delete $self->{0}, $len, 1);
+ recv_and_run($self, delete $self->{0}, $len, 1);
}
sub wq_do { # always async
PublicInbox::DS::sig_setmask($oldset);
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
- my $on_destroy = $self->ipc_atfork_child;
eval {
+ my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
wq_worker_loop($self);
};
warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
- undef $on_destroy;
undef $end; # trigger exit
} else {
$self->{-wq_workers}->{$pid} = \undef;