$n;
}
-sub wq_worker_loop ($) {
- my ($self, $bcast_a) = @_;
- my $wqw = PublicInbox::WQWorker->new($self);
- PublicInbox::WQWorker->new($self, '-wq_bcast2');
+sub wq_worker_loop ($$) {
+ my ($self, $bcast2) = @_;
+ my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2});
+ PublicInbox::WQWorker->new($self, $bcast2) if $bcast2;
PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
PublicInbox::DS->EventLoop;
PublicInbox::DS->Reset;
sub _wq_worker_start ($$$) {
my ($self, $oldset, $fields) = @_;
my ($bcast1, $bcast2);
- socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
- die "socketpair: $!";
+ $self->{-wq_no_bcast} or
+ socketpair($bcast1, $bcast2, AF_UNIX, $SEQPACKET, 0) or
+ die "socketpair: $!";
my $seed = rand(0xffffffff);
my $pid = fork // die "fork: $!";
if ($pid == 0) {
my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
PublicInbox::DS::sig_setmask($oldset);
- $self->{-wq_bcast2} = $bcast2;
- wq_worker_loop($self);
+ wq_worker_loop($self, $bcast2);
};
warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
undef $end; # trigger exit
use IO::Handle (); # blocking
sub new {
- my ($cls, $wq, $field) = @_;
- my $s2 = $wq->{$field // '-wq_s2'} // die "BUG: no {$field}";
- $s2->blocking(0);
- my $self = bless { sock => $s2, wq => $wq }, $cls;
- $self->SUPER::new($s2, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET);
+ my ($cls, $wq, $sock) = @_;
+ $sock->blocking(0);
+ my $self = bless { sock => $sock, wq => $wq }, $cls;
+ $self->SUPER::new($sock, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET);
$self;
}