use PublicInbox::Search qw(xap_terms);
use PublicInbox::Spawn qw(popen_rd spawn which);
use PublicInbox::MID qw(mids);
+use PublicInbox::Smsg;
use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
sub new {
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
local $0 = "$0 query_thread_mset";
- my %sig = $lei->atfork_child_wq($self);
- local @SIG{keys %sig} = values %sig;
+ $lei->atfork_child_wq($self);
my $startq = delete $lei->{startq};
my ($srch, $over) = ($ibxish->search, $ibxish->over);
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei) = @_;
local $0 = "$0 query_mset";
- my %sig = $lei->atfork_child_wq($self);
- local @SIG{keys %sig} = values %sig;
+ $lei->atfork_child_wq($self);
my $startq = delete $lei->{startq};
my $mo = { %{$lei->{mset_opt}} };
my $mset;
sub query_remote_mboxrd {
my ($self, $lei, $uris) = @_;
local $0 = "$0 query_remote_mboxrd";
- my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq
- local @SIG{keys %sig} = values %sig;
+ $lei->atfork_child_wq($self);
my ($opt, $env) = @$lei{qw(opt env)};
my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
push(@qform, t => 1) if $opt->{thread};
my $has_l2m = exists $lei->{l2m};
for my $f (qw(lxs l2m)) {
my $wq = delete $lei->{$f} or next;
- $wq->wq_wait_old;
+ $wq->wq_wait_old($lei);
}
$lei->{ovv}->ovv_end($lei);
if ($has_l2m) { # close() calls LeiToMail reap_compress
sub query_prepare { # called by wq_do
my ($self, $lei) = @_;
local $0 = "$0 query_prepare";
- my %sig = $lei->atfork_child_wq($self);
- -p $lei->{op_pipe} or die "BUG: \$done pipe expected";
- local @SIG{keys %sig} = values %sig;
+ $lei->atfork_child_wq($self);
delete $lei->{l2m}->{-wq_s1};
eval { $lei->{l2m}->do_augment($lei) };
$lei->fail($@) if $@;
syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!"
}
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
- my ($lei) = @_;
- my $lxs = delete $lei->{lxs};
- if ($lxs && $lxs->wq_kill_old) {
- kill 'PIPE', $$;
- $lxs->wq_wait_old;
+sub fail_handler ($;$$) {
+ my ($lei, $code, $io) = @_;
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_wait_old($lei) if $lxs->wq_kill_old; # lei-daemon
}
- close(delete $lei->{1}) if $lei->{1};
+ close($io) if $io; # needed to avoid warnings on SIGPIPE
+ $lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+ fail_handler($_[0], 13, delete $_[0]->{1});
}
sub do_query {
$lei->event_step_init; # wait for shutdowns
my $done_op = {
'' => [ \&query_done, $lei ],
- '!' => [ \&sigpipe_handler, $lei ]
+ '|' => [ \&sigpipe_handler, $lei ],
+ '!' => [ \&fail_handler, $lei ]
};
my $in_loop = exists $lei->{sock};
$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);