]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei + ipc: simplify process reaping
authorEric Wong <e@80x24.org>
Fri, 15 Oct 2021 13:30:55 +0000 (13:30 +0000)
committerEric Wong <e@80x24.org>
Fri, 15 Oct 2021 15:58:24 +0000 (15:58 +0000)
Simplify our APIs and force dwaitpid() to work in async mode for
all lei workers.  This avoids having lingering zombies for
parallel searches if one worker finishes soon before another.

The old distinction between "old" and "new" workers was
needlessly complex, error-prone, and embarrasingly bad.

We also never handled v2:// writers properly before on
Ctrl-C/Ctrl-Z (SIGINT/SIGTSTP), so add them to @WQ_KEYS
to ensure they get handled by $lei when appropropriate.

16 files changed:
lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiBlob.pm
lib/PublicInbox/LeiConvert.pm
lib/PublicInbox/LeiImportKw.pm
lib/PublicInbox/LeiInput.pm
lib/PublicInbox/LeiInspect.pm
lib/PublicInbox/LeiLsSearch.pm
lib/PublicInbox/LeiMirror.pm
lib/PublicInbox/LeiNoteEvent.pm
lib/PublicInbox/LeiP2q.pm
lib/PublicInbox/LeiPmdir.pm
lib/PublicInbox/LeiStore.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiUp.pm
lib/PublicInbox/LeiXSearch.pm

index 6c189b6410aa3798a736a6071a96826815b651be..3e299448b334629fb6427e7df02976381ec19242 100644 (file)
@@ -134,16 +134,22 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
        my ($args, $pid) = @_;
+       my ($self, @uargs) = @$args;
+       delete $self->{-wq_workers}->{$pid};
+       return $self->{-reap_do}->($args, $pid) if $self->{-reap_do};
        return if !$?;
-       # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
        my $s = $? & 127;
-       warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
+       # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
+       warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
 }
 
-sub wq_wait_old {
-       my ($self, $cb, @args) = @_;
-       my $pids = delete $self->{"-wq_old_pids.$$"} or return;
-       dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
+sub wq_wait_async {
+       my ($self, $cb, @uargs) = @_;
+       local $PublicInbox::DS::in_loop = 1;
+       $self->{-reap_async} = 1;
+       $self->{-reap_do} = $cb;
+       my @pids = keys %{$self->{-wq_workers}};
+       dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
 }
 
 # for base class, override in sub classes
@@ -394,42 +400,24 @@ sub wq_workers_start {
 }
 
 sub wq_close {
-       my ($self, $nohang, $cb, @args) = @_;
+       my ($self) = @_;
        delete @$self{qw(-wq_s1 -wq_s2)} or return;
-       my $ppid = delete $self->{-wq_ppid} or return;
-       my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
-       return if $ppid != $$; # can't reap siblings or parents
-       my @pids = map { $_ + 0 } keys %$workers;
-       if ($nohang) {
-               push @{$self->{"-wq_old_pids.$$"}}, @pids;
-       } else {
-               $cb //= \&ipc_worker_reap;
-               unshift @args, $self;
-               dwaitpid($_, $cb, \@args) for @pids;
-       }
-}
-
-sub wq_kill_old {
-       my ($self, $sig) = @_;
-       my $pids = $self->{"-wq_old_pids.$$"} or return;
-       kill($sig // 'TERM', @$pids);
+       return if $self->{-reap_async};
+       my @pids = keys %{$self->{-wq_workers}};
+       dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
 }
 
 sub wq_kill {
        my ($self, $sig) = @_;
-       my $workers = $self->{-wq_workers} or return;
-       kill($sig // 'TERM', keys %$workers);
+       kill($sig // 'TERM', keys %{$self->{-wq_workers}});
 }
 
 sub DESTROY {
        my ($self) = @_;
        my $ppid = $self->{-wq_ppid};
        wq_kill($self) if $ppid && $ppid == $$;
-       my $err = $?;
        wq_close($self);
-       wq_wait_old($self);
        ipc_worker_stop($self);
-       $? = $err if $err;
 }
 
 sub detect_nproc () {
index b6338377328f01a4cb382a13b2bf73e677cafa74..83534878ee693683b4ab493b60b7765286f762a1 100644 (file)
@@ -455,16 +455,12 @@ my %CONFIG_KEYS = (
        'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers
 
 sub _drop_wq {
        my ($self) = @_;
        for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) {
-               if ($wq->wq_kill('-TERM')) {
-                       $wq->wq_close(0, undef, $self);
-               } elsif ($wq->wq_kill_old('-TERM')) {
-                       $wq->wq_wait_old(undef, $self);
-               }
+               $wq->wq_kill('-TERM');
                $wq->DESTROY;
        }
 }
@@ -644,6 +640,7 @@ sub workers_start {
        my $op_c = delete $lei->{pkt_op_c};
        @$end = ();
        $lei->event_step_init;
+       $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        ($op_c, $ops);
 }
 
@@ -651,7 +648,7 @@ sub workers_start {
 sub wait_wq_events {
        my ($lei, $op_c, $ops) = @_;
        for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
-               $wq->wq_close(1);
+               $wq->wq_close;
        }
        $op_c->{ops} = $ops;
 }
@@ -1150,7 +1147,7 @@ sub event_step {
                if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) {
                        my $sig = "-$buf";
                        for my $wq (grep(defined, @$self{@WQ_KEYS})) {
-                               $wq->wq_kill($sig) or $wq->wq_kill_old($sig);
+                               $wq->wq_kill($sig);
                        }
                } else {
                        die "unrecognized client signal: $buf";
@@ -1393,8 +1390,7 @@ sub fchdir {
 sub wq_eof { # EOF callback for main daemon
        my ($lei) = @_;
        local $current_lei = $lei;
-       my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
-       $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
+       delete $lei->{wq1} // return $lei->fail; # already failed
 }
 
 sub watch_state_ok ($) {
index b6a62d246657f4d0b0d2c56efa5d6a1ef3f4de68..004b156c3396acde3e419bbe723e7cd144ebcf4d 100644 (file)
@@ -166,7 +166,7 @@ sub lei_blob {
        my ($op_c, $ops) = $lei->workers_start($self, 1);
        $lei->{wq1} = $self;
        $self->wq_io_do('do_solve_blob', []);
-       $self->wq_close(1);
+       $self->wq_close;
        $lei->wait_wq_events($op_c, $ops);
 }
 
index 9e98edc391a0e70c4227e549c64ed92a154c7630..68fc7c0ba2b0b2932784e5d00bb7602e896a2071 100644 (file)
@@ -58,7 +58,7 @@ sub lei_convert { # the main "lei convert" method
        my ($op_c, $ops) = $lei->workers_start($self, 1);
        $lei->{wq1} = $self;
        $self->wq_io_do('process_inputs', []);
-       $self->wq_close(1);
+       $self->wq_close;
        $lei->wait_wq_events($op_c, $ops);
 }
 
index 8359f3386668b19101a8060cd932d20422d7cc35..54454511f7bbee48dae0103d84c3a1bae3df16c4 100644 (file)
@@ -50,7 +50,6 @@ sub _lei_wq_eof { # EOF callback for main lei daemon
        my ($lei) = @_;
        my $ikw = delete $lei->{ikw} or return $lei->fail;
        $lei->sto_done_request($ikw->{lei_sock});
-       $ikw->wq_wait_old($lei->can('wq_done_wait'), $lei);
 }
 
 1;
index dd40d83840c57cb2e790fd86d30699b8a11f60c5..2621fc1f9d056a2e2c0e2a837f5efb4562b94b56 100644 (file)
@@ -402,7 +402,7 @@ sub input_only_atfork_child {
 sub input_only_net_merge_all_done {
        my ($self) = @_;
        $self->wq_io_do('process_inputs');
-       $self->wq_close(1);
+       $self->wq_close;
 }
 
 # like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare
index 5ba96056bc65b14589cb49910af5fd54faab1f83..05b6e21d298d66e5723dfe5e7faeb17e4317ffc4 100644 (file)
@@ -242,7 +242,7 @@ sub inspect_start ($$) {
        $lei->{wq1} = $self;
        $lei->wait_wq_events($op_c, $ops);
        $self->wq_do('inspect_argv');
-       $self->wq_close(1);
+       $self->wq_close;
 }
 
 sub ins_add { # InputPipe->consume callback
index aebf018429d3949ca9e2de8704d8ce680604e7f0..0193e590b8b06c202d4034330e3eaf24e79e2344 100644 (file)
@@ -75,7 +75,7 @@ sub bg_worker ($$$) {
        my ($op_c, $ops) = $lei->workers_start($self, 1);
        $lei->{wq1} = $self;
        $self->wq_io_do('do_ls_search_long', [], $pfx);
-       $self->wq_close(1);
+       $self->wq_close;
        $lei->wait_wq_events($op_c, $ops);
 }
 
index a75c99c4987f27906185b3921de10eb006314f4b..e20d30b48546fc9c1344d78b9d699858af4db0a6 100644 (file)
@@ -451,7 +451,7 @@ sub start {
        my ($op_c, $ops) = $lei->workers_start($self, 1);
        $lei->{wq1} = $self;
        $self->wq_io_do('do_mirror', []);
-       $self->wq_close(1);
+       $self->wq_close;
        $lei->wait_wq_events($op_c, $ops);
 }
 
index 1b714dae439326b5450ab8253855d484aff7a06f..ba4dfd49a43863ccce8994991b1e6ea7988817c7 100644 (file)
@@ -14,7 +14,7 @@ our $to_flush; # { cfgpath => $lei }
 sub flush_lei ($) {
        my ($lei) = @_;
        my $lne = delete $lei->{cfg}->{-lei_note_event};
-       $lne->wq_close(1, undef, $lei) if $lne; # runs _lei_wq_eof;
+       $lne->wq_close if $lne; # runs _lei_wq_eof;
 }
 
 # we batch up writes and flush every 5s (matching Linux default
@@ -111,9 +111,8 @@ sub ipc_atfork_child {
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
        my ($lei) = @_;
-       my $lne = delete $lei->{lne} or return $lei->fail;
+       delete $lei->{lne} or return $lei->fail;
        $lei->sto_done_request;
-       $lne->wq_wait_old($lei->can('wq_done_wait'), $lei);
 }
 
 1;
index 5c2ce0a13dfc85eda2bf28017f5fa0ac31712754..08ec81c5295ee3c43831dad38e3ef313c1e51dd6 100644 (file)
@@ -191,7 +191,7 @@ sub lei_p2q { # the "lei patch-to-query" entry point
        my ($op_c, $ops) = $lei->workers_start($self, 1);
        $lei->{wq1} = $self;
        $self->wq_io_do('do_p2q', []);
-       $self->wq_close(1);
+       $self->wq_close;
        $lei->wait_wq_events($op_c, $ops);
 }
 
index 2d3b9755534b9ce23b7d293719ef5b869190d78f..f9b68fc2a2d83b660f6101d97fc20e96136f3f45 100644 (file)
@@ -51,7 +51,6 @@ sub _lei_wq_eof { # EOF callback for main lei daemon
        my ($lei) = @_;
        my $pmd = delete $lei->{pmd} or return $lei->fail;
        $lei->sto_done_request($pmd->{lei_sock});
-       $pmd->wq_wait_old($lei->can('wq_done_wait'), $lei);
 }
 
 1;
index c45380d1c647af73e12d1029fbd012af8a80adab..821045701dfe53398ad89ebddeb7853e54a4a299 100644 (file)
@@ -587,6 +587,7 @@ sub write_prepare {
                                        -err_wr => $w,
                                        to_close => [ $r ],
                                });
+               $self->wq_wait_async; # outlives $lei
                require PublicInbox::LeiStoreErr;
                PublicInbox::LeiStoreErr->new($r, $lei);
        }
index 9c748deaed161db7b9e2b02674a521fcc6ce5e0a..76e103c7c2351638902bac42cd6290488e9e761d 100644 (file)
@@ -637,6 +637,12 @@ sub _do_augment_mbox {
        $dedupe->pause_dedupe if $dedupe;
 }
 
+sub v2w_done_wait { # dwaitpid callback
+       my ($arg, $pid) = @_;
+       my ($v2w, $lei) = @$arg;
+       $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
+}
+
 sub _pre_augment_v2 {
        my ($self, $lei) = @_;
        my $dir = $self->{dst};
@@ -659,8 +665,8 @@ sub _pre_augment_v2 {
        PublicInbox::InboxWritable->new($ibx, @creat);
        $ibx->init_inbox if @creat;
        my $v2w = $ibx->importer;
-       $v2w->{-wq_no_bcast} = 1;
        $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+       $v2w->wq_wait_async(\&v2w_done_wait, $lei);
        $lei->{v2w} = $v2w;
        return if !$lei->{opt}->{shared};
        my $d = "$lei->{ale}->{git}->{git_dir}/objects";
@@ -811,7 +817,7 @@ sub net_merge_all_done {
                                $self->{dst}, \$self->{-au_noted});
        }
        $self->wq_broadcast('do_post_auth');
-       $self->wq_close(1);
+       $self->wq_close;
 }
 
 1;
index df65cb9b8474af2e3f6c644ba39271e70a357b25..396041771ff9805c006248af8d02014454a21438 100644 (file)
@@ -122,7 +122,7 @@ EOM
 sub net_merge_all_done {
        my ($self, $lei) = @_;
        $lei->{net} = delete($self->{-net_new}) if $self->{-net_new};
-       $self->wq_close(1);
+       $self->wq_close;
        eval { redispatch_all($self, $lei) };
        warn "E: $@" if $@;
 }
index 3ec755280dc995250a1a4dac6d545b432748fbda..fd2c8a37c0aab60b8f0198ea7b7b60ed3a256b7b 100644 (file)
@@ -413,14 +413,14 @@ sub query_done { # EOF callback for main daemon
        my ($lei) = @_;
        local $PublicInbox::LEI::current_lei = $lei;
        my $l2m = delete $lei->{l2m};
-       $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
-       if (my $lxs = delete $lei->{lxs}) {
-               $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
-       }
+       delete $lei->{lxs};
        ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
                warn "BUG: {sto} missing with --mail-sync";
        $lei->sto_done_request if $lei->{sto};
-       my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
+       if (my $v2w = delete $lei->{v2w}) {
+               $v2w->wq_do('done');
+               $v2w->wq_close;
+       }
        $lei->{ovv}->ovv_end($lei);
        my $start_mua;
        if ($l2m) { # close() calls LeiToMail reap_compress
@@ -466,7 +466,7 @@ sub do_post_augment {
        if ($err) {
                if (my $lxs = delete $lei->{lxs}) {
                        $lxs->wq_kill('-TERM');
-                       $lxs->wq_close(0, undef, $lei);
+                       $lxs->wq_close;
                }
                $lei->fail("$err");
        }
@@ -514,7 +514,7 @@ sub start_query ($$) { # always runs in main (lei-daemon) process
        if ($self->{-do_lcat}) {
                $self->wq_io_do('lcat_dump', []);
        }
-       $self->wq_close(1); # lei_xsearch workers stop when done
+       $self->wq_close; # lei_xsearch workers stop when done
 }
 
 sub incr_start_query { # called whenever an l2m shard starts do_post_auth
@@ -569,12 +569,14 @@ sub do_query {
                }
                $l2m->wq_workers_start('lei2mail', undef,
                                        $lei->oldset, { lei => $lei });
+               $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
                pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
                fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                delete $l2m->{au_peers};
        }
        $self->wq_workers_start('lei_xsearch', undef,
                                $lei->oldset, { lei => $lei });
+       $self->wq_wait_async(\&xsearch_done_wait, $lei);
        my $op_c = delete $lei->{pkt_op_c};
        delete $lei->{pkt_op_p};
        @$end = ();