]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei_to_mail: reduce spew on Maildir removal
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index e69b637c7a29d1ef0da2e4c86780c5d4ba112c21..f630e79a9b1cb48687e5e1c8f2fbb3d3e615180b 100644 (file)
@@ -16,6 +16,7 @@ use File::Spec ();
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::Spawn qw(popen_rd spawn which);
 use PublicInbox::MID qw(mids);
+use PublicInbox::Smsg;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
 
 sub new {
@@ -109,8 +110,7 @@ sub wait_startq ($) {
 sub query_thread_mset { # for --thread
        my ($self, $lei, $ibxish) = @_;
        local $0 = "$0 query_thread_mset";
-       my %sig = $lei->atfork_child_wq($self);
-       local @SIG{keys %sig} = values %sig;
+       $lei->atfork_child_wq($self);
        my $startq = delete $lei->{startq};
 
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
@@ -145,8 +145,7 @@ sub query_thread_mset { # for --thread
 sub query_mset { # non-parallel for non-"--thread" users
        my ($self, $lei) = @_;
        local $0 = "$0 query_mset";
-       my %sig = $lei->atfork_child_wq($self);
-       local @SIG{keys %sig} = values %sig;
+       $lei->atfork_child_wq($self);
        my $startq = delete $lei->{startq};
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
@@ -187,8 +186,7 @@ sub kill_reap {
 sub query_remote_mboxrd {
        my ($self, $lei, $uris) = @_;
        local $0 = "$0 query_remote_mboxrd";
-       my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq
-       local @SIG{keys %sig} = values %sig;
+       $lei->atfork_child_wq($self);
        my ($opt, $env) = @$lei{qw(opt env)};
        my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
        push(@qform, t => 1) if $opt->{thread};
@@ -286,7 +284,7 @@ sub query_done { # EOF callback
        my $has_l2m = exists $lei->{l2m};
        for my $f (qw(lxs l2m)) {
                my $wq = delete $lei->{$f} or next;
-               $wq->wq_wait_old;
+               $wq->wq_wait_old($lei);
        }
        $lei->{ovv}->ovv_end($lei);
        if ($has_l2m) { # close() calls LeiToMail reap_compress
@@ -351,23 +349,24 @@ sub start_query { # always runs in main (lei-daemon) process
 sub query_prepare { # called by wq_do
        my ($self, $lei) = @_;
        local $0 = "$0 query_prepare";
-       my %sig = $lei->atfork_child_wq($self);
-       -p $lei->{op_pipe} or die "BUG: \$done pipe expected";
-       local @SIG{keys %sig} = values %sig;
+       $lei->atfork_child_wq($self);
        delete $lei->{l2m}->{-wq_s1};
        eval { $lei->{l2m}->do_augment($lei) };
        $lei->fail($@) if $@;
        syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
-       my ($lei) = @_;
-       my $lxs = delete $lei->{lxs};
-       if ($lxs && $lxs->wq_kill_old) {
-               kill 'PIPE', $$;
-               $lxs->wq_wait_old;
+sub fail_handler ($;$$) {
+       my ($lei, $code, $io) = @_;
+       if (my $lxs = delete $lei->{lxs}) {
+               $lxs->wq_wait_old($lei) if $lxs->wq_kill_old; # lei-daemon
        }
-       close(delete $lei->{1}) if $lei->{1};
+       close($io) if $io; # needed to avoid warnings on SIGPIPE
+       $lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+       fail_handler($_[0], 13, delete $_[0]->{1});
 }
 
 sub do_query {
@@ -388,7 +387,8 @@ sub do_query {
        $lei->event_step_init; # wait for shutdowns
        my $done_op = {
                '' => [ \&query_done, $lei ],
-               '!' => [ \&sigpipe_handler, $lei ]
+               '|' => [ \&sigpipe_handler, $lei ],
+               '!' => [ \&fail_handler, $lei ]
        };
        my $in_loop = exists $lei->{sock};
        $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);