Simplify our APIs and force dwaitpid() to work in async mode for
all lei workers. This avoids having lingering zombies for
parallel searches if one worker finishes soon before another.
The old distinction between "old" and "new" workers was
needlessly complex, error-prone, and embarrasingly bad.
We also never handled v2:// writers properly before on
Ctrl-C/Ctrl-Z (SIGINT/SIGTSTP), so add them to @WQ_KEYS
to ensure they get handled by $lei when appropropriate.
16 files changed:
sub ipc_worker_reap { # dwaitpid callback
my ($args, $pid) = @_;
sub ipc_worker_reap { # dwaitpid callback
my ($args, $pid) = @_;
+ my ($self, @uargs) = @$args;
+ delete $self->{-wq_workers}->{$pid};
+ return $self->{-reap_do}->($args, $pid) if $self->{-reap_do};
- # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
- warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
+ # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
+ warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
-sub wq_wait_old {
- my ($self, $cb, @args) = @_;
- my $pids = delete $self->{"-wq_old_pids.$$"} or return;
- dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
+sub wq_wait_async {
+ my ($self, $cb, @uargs) = @_;
+ local $PublicInbox::DS::in_loop = 1;
+ $self->{-reap_async} = 1;
+ $self->{-reap_do} = $cb;
+ my @pids = keys %{$self->{-wq_workers}};
+ dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
}
# for base class, override in sub classes
}
# for base class, override in sub classes
- my ($self, $nohang, $cb, @args) = @_;
delete @$self{qw(-wq_s1 -wq_s2)} or return;
delete @$self{qw(-wq_s1 -wq_s2)} or return;
- my $ppid = delete $self->{-wq_ppid} or return;
- my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
- return if $ppid != $$; # can't reap siblings or parents
- my @pids = map { $_ + 0 } keys %$workers;
- if ($nohang) {
- push @{$self->{"-wq_old_pids.$$"}}, @pids;
- } else {
- $cb //= \&ipc_worker_reap;
- unshift @args, $self;
- dwaitpid($_, $cb, \@args) for @pids;
- }
-}
-
-sub wq_kill_old {
- my ($self, $sig) = @_;
- my $pids = $self->{"-wq_old_pids.$$"} or return;
- kill($sig // 'TERM', @$pids);
+ return if $self->{-reap_async};
+ my @pids = keys %{$self->{-wq_workers}};
+ dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
}
sub wq_kill {
my ($self, $sig) = @_;
}
sub wq_kill {
my ($self, $sig) = @_;
- my $workers = $self->{-wq_workers} or return;
- kill($sig // 'TERM', keys %$workers);
+ kill($sig // 'TERM', keys %{$self->{-wq_workers}});
}
sub DESTROY {
my ($self) = @_;
my $ppid = $self->{-wq_ppid};
wq_kill($self) if $ppid && $ppid == $$;
}
sub DESTROY {
my ($self) = @_;
my $ppid = $self->{-wq_ppid};
wq_kill($self) if $ppid && $ppid == $$;
'leistore.dir' => 'top-level storage location',
);
'leistore.dir' => 'top-level storage location',
);
-my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers
sub _drop_wq {
my ($self) = @_;
for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) {
sub _drop_wq {
my ($self) = @_;
for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) {
- if ($wq->wq_kill('-TERM')) {
- $wq->wq_close(0, undef, $self);
- } elsif ($wq->wq_kill_old('-TERM')) {
- $wq->wq_wait_old(undef, $self);
- }
my $op_c = delete $lei->{pkt_op_c};
@$end = ();
$lei->event_step_init;
my $op_c = delete $lei->{pkt_op_c};
@$end = ();
$lei->event_step_init;
+ $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
sub wait_wq_events {
my ($lei, $op_c, $ops) = @_;
for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
sub wait_wq_events {
my ($lei, $op_c, $ops) = @_;
for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) {
my $sig = "-$buf";
for my $wq (grep(defined, @$self{@WQ_KEYS})) {
if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) {
my $sig = "-$buf";
for my $wq (grep(defined, @$self{@WQ_KEYS})) {
- $wq->wq_kill($sig) or $wq->wq_kill_old($sig);
}
} else {
die "unrecognized client signal: $buf";
}
} else {
die "unrecognized client signal: $buf";
sub wq_eof { # EOF callback for main daemon
my ($lei) = @_;
local $current_lei = $lei;
sub wq_eof { # EOF callback for main daemon
my ($lei) = @_;
local $current_lei = $lei;
- my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
- $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
+ delete $lei->{wq1} // return $lei->fail; # already failed
}
sub watch_state_ok ($) {
}
sub watch_state_ok ($) {
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_solve_blob', []);
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_solve_blob', []);
$lei->wait_wq_events($op_c, $ops);
}
$lei->wait_wq_events($op_c, $ops);
}
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('process_inputs', []);
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('process_inputs', []);
$lei->wait_wq_events($op_c, $ops);
}
$lei->wait_wq_events($op_c, $ops);
}
my ($lei) = @_;
my $ikw = delete $lei->{ikw} or return $lei->fail;
$lei->sto_done_request($ikw->{lei_sock});
my ($lei) = @_;
my $ikw = delete $lei->{ikw} or return $lei->fail;
$lei->sto_done_request($ikw->{lei_sock});
- $ikw->wq_wait_old($lei->can('wq_done_wait'), $lei);
sub input_only_net_merge_all_done {
my ($self) = @_;
$self->wq_io_do('process_inputs');
sub input_only_net_merge_all_done {
my ($self) = @_;
$self->wq_io_do('process_inputs');
}
# like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare
}
# like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare
$lei->{wq1} = $self;
$lei->wait_wq_events($op_c, $ops);
$self->wq_do('inspect_argv');
$lei->{wq1} = $self;
$lei->wait_wq_events($op_c, $ops);
$self->wq_do('inspect_argv');
}
sub ins_add { # InputPipe->consume callback
}
sub ins_add { # InputPipe->consume callback
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_ls_search_long', [], $pfx);
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_ls_search_long', [], $pfx);
$lei->wait_wq_events($op_c, $ops);
}
$lei->wait_wq_events($op_c, $ops);
}
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_mirror', []);
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_mirror', []);
$lei->wait_wq_events($op_c, $ops);
}
$lei->wait_wq_events($op_c, $ops);
}
sub flush_lei ($) {
my ($lei) = @_;
my $lne = delete $lei->{cfg}->{-lei_note_event};
sub flush_lei ($) {
my ($lei) = @_;
my $lne = delete $lei->{cfg}->{-lei_note_event};
- $lne->wq_close(1, undef, $lei) if $lne; # runs _lei_wq_eof;
+ $lne->wq_close if $lne; # runs _lei_wq_eof;
}
# we batch up writes and flush every 5s (matching Linux default
}
# we batch up writes and flush every 5s (matching Linux default
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
- my $lne = delete $lei->{lne} or return $lei->fail;
+ delete $lei->{lne} or return $lei->fail;
- $lne->wq_wait_old($lei->can('wq_done_wait'), $lei);
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_p2q', []);
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_p2q', []);
$lei->wait_wq_events($op_c, $ops);
}
$lei->wait_wq_events($op_c, $ops);
}
my ($lei) = @_;
my $pmd = delete $lei->{pmd} or return $lei->fail;
$lei->sto_done_request($pmd->{lei_sock});
my ($lei) = @_;
my $pmd = delete $lei->{pmd} or return $lei->fail;
$lei->sto_done_request($pmd->{lei_sock});
- $pmd->wq_wait_old($lei->can('wq_done_wait'), $lei);
-err_wr => $w,
to_close => [ $r ],
});
-err_wr => $w,
to_close => [ $r ],
});
+ $self->wq_wait_async; # outlives $lei
require PublicInbox::LeiStoreErr;
PublicInbox::LeiStoreErr->new($r, $lei);
}
require PublicInbox::LeiStoreErr;
PublicInbox::LeiStoreErr->new($r, $lei);
}
$dedupe->pause_dedupe if $dedupe;
}
$dedupe->pause_dedupe if $dedupe;
}
+sub v2w_done_wait { # dwaitpid callback
+ my ($arg, $pid) = @_;
+ my ($v2w, $lei) = @$arg;
+ $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
+}
+
sub _pre_augment_v2 {
my ($self, $lei) = @_;
my $dir = $self->{dst};
sub _pre_augment_v2 {
my ($self, $lei) = @_;
my $dir = $self->{dst};
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
my $v2w = $ibx->importer;
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
my $v2w = $ibx->importer;
- $v2w->{-wq_no_bcast} = 1;
$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+ $v2w->wq_wait_async(\&v2w_done_wait, $lei);
$lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
$lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
$self->{dst}, \$self->{-au_noted});
}
$self->wq_broadcast('do_post_auth');
$self->{dst}, \$self->{-au_noted});
}
$self->wq_broadcast('do_post_auth');
sub net_merge_all_done {
my ($self, $lei) = @_;
$lei->{net} = delete($self->{-net_new}) if $self->{-net_new};
sub net_merge_all_done {
my ($self, $lei) = @_;
$lei->{net} = delete($self->{-net_new}) if $self->{-net_new};
eval { redispatch_all($self, $lei) };
warn "E: $@" if $@;
}
eval { redispatch_all($self, $lei) };
warn "E: $@" if $@;
}
my ($lei) = @_;
local $PublicInbox::LEI::current_lei = $lei;
my $l2m = delete $lei->{l2m};
my ($lei) = @_;
local $PublicInbox::LEI::current_lei = $lei;
my $l2m = delete $lei->{l2m};
- $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
- }
($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
warn "BUG: {sto} missing with --mail-sync";
$lei->sto_done_request if $lei->{sto};
($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
warn "BUG: {sto} missing with --mail-sync";
$lei->sto_done_request if $lei->{sto};
- my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
+ if (my $v2w = delete $lei->{v2w}) {
+ $v2w->wq_do('done');
+ $v2w->wq_close;
+ }
$lei->{ovv}->ovv_end($lei);
my $start_mua;
if ($l2m) { # close() calls LeiToMail reap_compress
$lei->{ovv}->ovv_end($lei);
my $start_mua;
if ($l2m) { # close() calls LeiToMail reap_compress
if ($err) {
if (my $lxs = delete $lei->{lxs}) {
$lxs->wq_kill('-TERM');
if ($err) {
if (my $lxs = delete $lei->{lxs}) {
$lxs->wq_kill('-TERM');
- $lxs->wq_close(0, undef, $lei);
if ($self->{-do_lcat}) {
$self->wq_io_do('lcat_dump', []);
}
if ($self->{-do_lcat}) {
$self->wq_io_do('lcat_dump', []);
}
- $self->wq_close(1); # lei_xsearch workers stop when done
+ $self->wq_close; # lei_xsearch workers stop when done
}
sub incr_start_query { # called whenever an l2m shard starts do_post_auth
}
sub incr_start_query { # called whenever an l2m shard starts do_post_auth
}
$l2m->wq_workers_start('lei2mail', undef,
$lei->oldset, { lei => $lei });
}
$l2m->wq_workers_start('lei2mail', undef,
$lei->oldset, { lei => $lei });
+ $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
delete $l2m->{au_peers};
}
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
delete $l2m->{au_peers};
}
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
+ $self->wq_wait_async(\&xsearch_done_wait, $lei);
my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();
my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();