@$io = ();
}
-sub query_prepare { # for wq_do,
+sub query_prepare { # called by wq_do
my ($self, $lei) = @_;
my %sig = $lei->atfork_child_wq($self);
local @SIG{keys %sig} = values %sig;
$lei->fail($@) if $@;
}
+sub sigpipe_handler {
+ my ($self, $lei_orig, $pids) = @_;
+ if ($pids) { # one-shot (no event loop)
+ kill 'TERM', @$pids;
+ kill 'PIPE', $$;
+ } else {
+ $self->wq_kill;
+ $self->wq_close;
+ }
+ close(delete $lei_orig->{1}) if $lei_orig->{1};
+}
+
sub do_query {
my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
pipe(my $done, $io[0]) or die "pipe $!";
$lei_orig->event_step_init; # wait for shutdowns
- my $done_op = { '' => [ \&query_done, $self, $lei_orig ] };
+ my $done_op = {
+ '' => [ \&query_done, $self, $lei_orig ],
+ '!' => [ \&sigpipe_handler, $self, $lei_orig ]
+ };
my $in_loop = exists $lei_orig->{sock};
$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
my $l2m = $lei->{l2m};
my @l2m_io = (undef, @io[1..$#io]);
pipe(my $startq, $l2m_io[0]) or die "pipe: $!";
$self->wq_do('query_prepare', \@l2m_io, $lei);
- $io[4] //= *STDERR{GLOB};
+ $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
$io[5] = $startq;
unless ($in_loop) {
my @pids = $self->wq_close;
# for the $lei->atfork_child_wq PIPE handler:
- $done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+ $done_op->{'!'}->[3] = \@pids;
$done->event_step;
my $ipc_worker_reap = $self->can('ipc_worker_reap');
if (my $l2m_pids = delete $self->{l2m_pids}) {