sub ipc_worker_reap { # dwaitpid callback
my ($args, $pid) = @_;
+ my ($self, @uargs) = @$args;
+ delete $self->{-wq_workers}->{$pid};
+ return $self->{-reap_do}->($args, $pid) if $self->{-reap_do};
return if !$?;
- # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
my $s = $? & 127;
- warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
+ # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
+ warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
}
-sub wq_wait_old {
- my ($self, $cb, @args) = @_;
- my $pids = delete $self->{"-wq_old_pids.$$"} or return;
- dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids;
+sub wq_wait_async {
+ my ($self, $cb, @uargs) = @_;
+ local $PublicInbox::DS::in_loop = 1;
+ $self->{-reap_async} = 1;
+ $self->{-reap_do} = $cb;
+ my @pids = keys %{$self->{-wq_workers}};
+ dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
}
# for base class, override in sub classes
}
sub wq_close {
- my ($self, $nohang, $cb, @args) = @_;
+ my ($self) = @_;
delete @$self{qw(-wq_s1 -wq_s2)} or return;
- my $ppid = delete $self->{-wq_ppid} or return;
- my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
- return if $ppid != $$; # can't reap siblings or parents
- my @pids = map { $_ + 0 } keys %$workers;
- if ($nohang) {
- push @{$self->{"-wq_old_pids.$$"}}, @pids;
- } else {
- $cb //= \&ipc_worker_reap;
- unshift @args, $self;
- dwaitpid($_, $cb, \@args) for @pids;
- }
-}
-
-sub wq_kill_old {
- my ($self, $sig) = @_;
- my $pids = $self->{"-wq_old_pids.$$"} or return;
- kill($sig // 'TERM', @$pids);
+ return if $self->{-reap_async};
+ my @pids = keys %{$self->{-wq_workers}};
+ dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
}
sub wq_kill {
my ($self, $sig) = @_;
- my $workers = $self->{-wq_workers} or return;
- kill($sig // 'TERM', keys %$workers);
+ kill($sig // 'TERM', keys %{$self->{-wq_workers}});
}
sub DESTROY {
my ($self) = @_;
my $ppid = $self->{-wq_ppid};
wq_kill($self) if $ppid && $ppid == $$;
- my $err = $?;
wq_close($self);
- wq_wait_old($self);
ipc_worker_stop($self);
- $? = $err if $err;
}
sub detect_nproc () {
'leistore.dir' => 'top-level storage location',
);
-my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers
sub _drop_wq {
my ($self) = @_;
for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) {
- if ($wq->wq_kill('-TERM')) {
- $wq->wq_close(0, undef, $self);
- } elsif ($wq->wq_kill_old('-TERM')) {
- $wq->wq_wait_old(undef, $self);
- }
+ $wq->wq_kill('-TERM');
$wq->DESTROY;
}
}
my $op_c = delete $lei->{pkt_op_c};
@$end = ();
$lei->event_step_init;
+ $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
($op_c, $ops);
}
sub wait_wq_events {
my ($lei, $op_c, $ops) = @_;
for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs
- $wq->wq_close(1);
+ $wq->wq_close;
}
$op_c->{ops} = $ops;
}
if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) {
my $sig = "-$buf";
for my $wq (grep(defined, @$self{@WQ_KEYS})) {
- $wq->wq_kill($sig) or $wq->wq_kill_old($sig);
+ $wq->wq_kill($sig);
}
} else {
die "unrecognized client signal: $buf";
sub wq_eof { # EOF callback for main daemon
my ($lei) = @_;
local $current_lei = $lei;
- my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed
- $wq1->wq_wait_old($wq1->can('_wq_done_wait') // \&wq_done_wait, $lei);
+ delete $lei->{wq1} // return $lei->fail; # already failed
}
sub watch_state_ok ($) {
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_solve_blob', []);
- $self->wq_close(1);
+ $self->wq_close;
$lei->wait_wq_events($op_c, $ops);
}
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('process_inputs', []);
- $self->wq_close(1);
+ $self->wq_close;
$lei->wait_wq_events($op_c, $ops);
}
my ($lei) = @_;
my $ikw = delete $lei->{ikw} or return $lei->fail;
$lei->sto_done_request($ikw->{lei_sock});
- $ikw->wq_wait_old($lei->can('wq_done_wait'), $lei);
}
1;
sub input_only_net_merge_all_done {
my ($self) = @_;
$self->wq_io_do('process_inputs');
- $self->wq_close(1);
+ $self->wq_close;
}
# like Getopt::Long, but for +kw:FOO and -kw:FOO to prepare
$lei->{wq1} = $self;
$lei->wait_wq_events($op_c, $ops);
$self->wq_do('inspect_argv');
- $self->wq_close(1);
+ $self->wq_close;
}
sub ins_add { # InputPipe->consume callback
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_ls_search_long', [], $pfx);
- $self->wq_close(1);
+ $self->wq_close;
$lei->wait_wq_events($op_c, $ops);
}
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_mirror', []);
- $self->wq_close(1);
+ $self->wq_close;
$lei->wait_wq_events($op_c, $ops);
}
sub flush_lei ($) {
my ($lei) = @_;
my $lne = delete $lei->{cfg}->{-lei_note_event};
- $lne->wq_close(1, undef, $lei) if $lne; # runs _lei_wq_eof;
+ $lne->wq_close if $lne; # runs _lei_wq_eof;
}
# we batch up writes and flush every 5s (matching Linux default
sub _lei_wq_eof { # EOF callback for main lei daemon
my ($lei) = @_;
- my $lne = delete $lei->{lne} or return $lei->fail;
+ delete $lei->{lne} or return $lei->fail;
$lei->sto_done_request;
- $lne->wq_wait_old($lei->can('wq_done_wait'), $lei);
}
1;
my ($op_c, $ops) = $lei->workers_start($self, 1);
$lei->{wq1} = $self;
$self->wq_io_do('do_p2q', []);
- $self->wq_close(1);
+ $self->wq_close;
$lei->wait_wq_events($op_c, $ops);
}
my ($lei) = @_;
my $pmd = delete $lei->{pmd} or return $lei->fail;
$lei->sto_done_request($pmd->{lei_sock});
- $pmd->wq_wait_old($lei->can('wq_done_wait'), $lei);
}
1;
-err_wr => $w,
to_close => [ $r ],
});
+ $self->wq_wait_async; # outlives $lei
require PublicInbox::LeiStoreErr;
PublicInbox::LeiStoreErr->new($r, $lei);
}
$dedupe->pause_dedupe if $dedupe;
}
+sub v2w_done_wait { # dwaitpid callback
+ my ($arg, $pid) = @_;
+ my ($v2w, $lei) = @$arg;
+ $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
+}
+
sub _pre_augment_v2 {
my ($self, $lei) = @_;
my $dir = $self->{dst};
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
my $v2w = $ibx->importer;
- $v2w->{-wq_no_bcast} = 1;
$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+ $v2w->wq_wait_async(\&v2w_done_wait, $lei);
$lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
$self->{dst}, \$self->{-au_noted});
}
$self->wq_broadcast('do_post_auth');
- $self->wq_close(1);
+ $self->wq_close;
}
1;
sub net_merge_all_done {
my ($self, $lei) = @_;
$lei->{net} = delete($self->{-net_new}) if $self->{-net_new};
- $self->wq_close(1);
+ $self->wq_close;
eval { redispatch_all($self, $lei) };
warn "E: $@" if $@;
}
my ($lei) = @_;
local $PublicInbox::LEI::current_lei = $lei;
my $l2m = delete $lei->{l2m};
- $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m;
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_wait_old(\&xsearch_done_wait, $lei);
- }
+ delete $lei->{lxs};
($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
warn "BUG: {sto} missing with --mail-sync";
$lei->sto_done_request if $lei->{sto};
- my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
+ if (my $v2w = delete $lei->{v2w}) {
+ $v2w->wq_do('done');
+ $v2w->wq_close;
+ }
$lei->{ovv}->ovv_end($lei);
my $start_mua;
if ($l2m) { # close() calls LeiToMail reap_compress
if ($err) {
if (my $lxs = delete $lei->{lxs}) {
$lxs->wq_kill('-TERM');
- $lxs->wq_close(0, undef, $lei);
+ $lxs->wq_close;
}
$lei->fail("$err");
}
if ($self->{-do_lcat}) {
$self->wq_io_do('lcat_dump', []);
}
- $self->wq_close(1); # lei_xsearch workers stop when done
+ $self->wq_close; # lei_xsearch workers stop when done
}
sub incr_start_query { # called whenever an l2m shard starts do_post_auth
}
$l2m->wq_workers_start('lei2mail', undef,
$lei->oldset, { lei => $lei });
+ $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
delete $l2m->{au_peers};
}
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
+ $self->wq_wait_async(\&xsearch_done_wait, $lei);
my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();