lib/PublicInbox/IPC.pm | 30 ++++++++++++++---------------- lib/PublicInbox/LEI.pm | 8 +++----- lib/PublicInbox/LeiConvert.pm | 2 +- lib/PublicInbox/LeiInput.pm | 2 +- lib/PublicInbox/LeiMirror.pm | 7 +++---- lib/PublicInbox/LeiStore.pm | 7 +++---- lib/PublicInbox/LeiToMail.pm | 7 +++---- lib/PublicInbox/LeiUp.pm | 5 ++--- lib/PublicInbox/LeiXSearch.pm | 9 ++++----- script/public-inbox-clone | 2 +- diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 34e401185e92906e9e3762e849c86783412c2496..edc5ba6486f12a888432d3828fdd4e7f354cff75 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -12,7 +12,7 @@ use strict; use v5.10.1; use parent qw(Exporter); use Carp qw(croak); -use PublicInbox::DS qw(dwaitpid); +use PublicInbox::DS qw(awaitpid); use PublicInbox::Spawn; use PublicInbox::OnDestroy; use PublicInbox::WQWorker; @@ -133,26 +133,26 @@ $w_req->autoflush(1); $self->{-ipc_req} = $w_req; $self->{-ipc_res} = $r_res; $self->{-ipc_ppid} = $$; + awaitpid($pid, \&ipc_worker_reap, $self); $self->{-ipc_pid} = $pid; } -sub ipc_worker_reap { # dwaitpid callback - my ($args, $pid) = @_; - my ($self, @uargs) = @$args; +sub ipc_worker_reap { # awaitpid callback + my ($pid, $self) = @_; delete $self->{-wq_workers}->{$pid}; - return $self->{-reap_do}->($args, $pid) if $self->{-reap_do}; + if (my $cb_args = $self->{-reap_do}) { + return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]); + } return if !$?; my $s = $? & 127; # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13 } -sub wq_wait_async { - my ($self, $cb, @uargs) = @_; - local $PublicInbox::DS::in_loop = 1; - $self->{-reap_do} = $cb; - my @pids = keys %{$self->{-wq_workers}}; - dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids; +# register wait workers +sub awaitpid_init { + my ($self, @cb_args) = @_; + $self->{-reap_do} = \@cb_args; } # for base class, override in sub classes @@ -178,9 +178,7 @@ return; # idempotent } die 'no PID with IPC pipes' unless $pid; $w_req = $r_res = undef; - - return if $$ != $ppid; - dwaitpid($pid, \&ipc_worker_reap, [$self]); + awaitpid($pid) if $$ == $ppid; # for non-event loop } # use this if we have multiple readers reading curl or "pigz -dc" @@ -397,6 +395,7 @@ warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; undef $end; # trigger exit } else { $self->{-wq_workers}->{$pid} = $bcast1; + awaitpid($pid, \&ipc_worker_reap, $self); } } @@ -428,8 +427,7 @@ $wqb->enq_close; } delete @$self{qw(-wq_s1 -wq_s2)} or return; return if $self->{-reap_do}; - my @pids = keys %{$self->{-wq_workers}}; - dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids; + awaitpid($_) for keys %{$self->{-wq_workers}}; } sub wq_kill { diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index b78d70de95f4e1ef7d616fd33d55f58d62e43d0c..6ad421111db6bf74e157f9e1816179a6a3ab8bc6 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -18,7 +18,6 @@ use IO::Handle (); use Fcntl qw(SEEK_SET); use PublicInbox::Config; use PublicInbox::Syscall qw(EPOLLIN); -use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::Lock; use PublicInbox::Eml; @@ -644,12 +643,12 @@ $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ]; my $end = $lei->pkt_op_pair; my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker"; $flds->{lei} = $lei; + $wq->awaitpid_init($wq->can('_wq_done_wait') // \&wq_done_wait, $lei); $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds); delete $lei->{pkt_op_p}; my $op_c = delete $lei->{pkt_op_c}; @$end = (); $lei->event_step_init; - $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei); ($op_c, $ops); } @@ -1391,9 +1390,8 @@ dump_and_clear_log(); # preserve $? for ->fail or ->x_it code } -sub wq_done_wait { # dwaitpid callback - my ($arg, $pid) = @_; - my ($wq, $lei) = @$arg; +sub wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init) + my ($pid, $wq, $lei) = @_; local $current_lei = $lei; my $err_type = $lei->{-err_type}; $? and $lei->child_error($?, diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 59af40dea1b11a64ca8e074412c978ab1e98020d..1acd4558ff0b89cc2ac01afd07638bccf0c5af41 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -30,7 +30,7 @@ } sub process_inputs { # via wq_do my ($self) = @_; - local $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid + local $PublicInbox::DS::in_loop = 0; # force synchronous awaitpid $self->SUPER::process_inputs; my $lei = $self->{lei}; delete $lei->{1}; diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index a1dcc907252f47d7716a83e799e2912a34be2416..c258f824e8325101c80e55362ea6368b27434358 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -177,7 +177,7 @@ my $in = delete $mbl->{fh}; $mbl->{fh} = PublicInbox::MboxReader::zsfxcat($in, $zsfx, $lei); } - local $PublicInbox::DS::in_loop = 0 if $zsfx; # dwaitpid + local $PublicInbox::DS::in_loop = 0 if $zsfx; # awaitpid $self->input_fh($ifmt, $mbl->{fh}, $input, @args); } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) { return $lei->fail(<{lei}->{child_error} || $_[0]->{lei}->{opt}->{'keep-going'}); } -sub _wq_done_wait { # dwaitpid callback (via wq_eof) - my ($arg, $pid) = @_; - my ($mrr, $lei) = @$arg; +sub _wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init) + my ($pid, $mrr, $lei) = @_; if ($?) { $lei->child_error($?); } elsif (!$lei->{child_error}) { @@ -236,7 +235,7 @@ for my $sw ($lei->index_opt) { my ($k) = ($sw =~ /\A([\w-]+)/); $opt->{$k} = $lei->{opt}->{$k}; } - # force synchronous dwaitpid for v2: + # force synchronous awaitpid for v2: local $PublicInbox::DS::in_loop = 0; my $cfg = PublicInbox::Config->new(undef, $lei->{2}); my $env = PublicInbox::Admin::index_prepare($opt, $cfg); diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 57f0e0132f4316fa3a97f0b1dc56b99eaa8a3969..0ecf138876e4faa4e88d5615790b77a6aa331ad5 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -604,9 +604,8 @@ local $PublicInbox::DS::in_loop = 0; # waitpid synchronously $self->SUPER::recv_and_run(@args); } -sub _sto_atexit { # dwaitpid callback - my ($args, $pid) = @_; - my $self = $args->[0]; +sub _sto_atexit { # awaitpid cb (via awaitpid_init) + my ($pid, $sto) = @_; warn "lei/store PID:$pid died \$?=$?\n" if $?; } @@ -621,12 +620,12 @@ $w->autoflush(1); # Mail we import into lei are private, so headers filtered out # by -mda for public mail are not appropriate local @PublicInbox::MDA::BAD_HEADERS = (); + $self->awaitpid_init(\&_sto_atexit); # outlives $lei $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, { lei => $lei, -err_wr => $w, to_close => [ $r ], }); - $self->wq_wait_async(\&_sto_atexit); # outlives $lei require PublicInbox::LeiStoreErr; PublicInbox::LeiStoreErr->new($r, $lei); } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 1528165a7e00f9571332062c27934c09058b2418..6a4554e76d323e333af5ab128741fca5c649f913 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -652,9 +652,8 @@ seek($out, 0, SEEK_END) or die "seek $dst: $!"; $dedupe->pause_dedupe if $dedupe; } -sub v2w_done_wait { # dwaitpid callback - my ($arg, $pid) = @_; - my ($v2w, $lei) = @$arg; +sub v2w_done_wait { # awaitpid cb (via awaitpid_init) + my ($pid, $v2w, $lei) = @_; $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?; } @@ -680,8 +679,8 @@ } PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; my $v2w = $ibx->importer; + $v2w->awaitpid_init(\&v2w_done_wait, $lei); $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); - $v2w->wq_wait_async(\&v2w_done_wait, $lei); $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm index 499173393ca6ff3f43a017417945f8d993618c82..3e92242ec044039aac8f1b91664c85e85cdf6394 100644 --- a/lib/PublicInbox/LeiUp.pm +++ b/lib/PublicInbox/LeiUp.pm @@ -165,9 +165,8 @@ my $match_cb = $lei->complete_url_prepare(\@argv); map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei); } -sub _wq_done_wait { # dwaitpid callback - my ($arg, $pid) = @_; - my ($wq, $lei) = @$arg; +sub _wq_done_wait { # awaitpid cb (via awaitpid_init) + my ($pid, $wq, $lei) = @_; $lei->child_error($?, 'auth failure') if $? } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 730df1f772f12da20821b3546e4e459b57419689..f9aa870e0346247acbde426b7bda894105a38725 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -400,9 +400,8 @@ } sub git { $_[0]->{git} // die 'BUG: git uninitialized' } -sub xsearch_done_wait { # dwaitpid callback - my ($arg, $pid) = @_; - my ($wq, $lei) = @$arg; +sub xsearch_done_wait { # awaitpid cb (via awaitpid_init) + my ($pid, $wq, $lei) = @_; return if !$?; my $s = $? & 127; return $lei->child_error($?) if $s == 13 || $s == 15; @@ -573,16 +572,16 @@ pipe(my ($b_r, $b_w)) or die "pipe: $!"; fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ]; } + $l2m->awaitpid_init(\&xsearch_done_wait, $lei); $l2m->wq_workers_start('lei2mail', undef, $lei->oldset, { lei => $lei }); - $l2m->wq_wait_async(\&xsearch_done_wait, $lei); pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; delete $l2m->{au_peers}; } + $self->awaitpid_init(\&xsearch_done_wait, $lei); $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }); - $self->wq_wait_async(\&xsearch_done_wait, $lei); my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = (); diff --git a/script/public-inbox-clone b/script/public-inbox-clone index e93ac37b2fb9ba2433678d574d6a925b62751823..598979bc1a467a0e308e1278a98e6e93f154bf36 100755 --- a/script/public-inbox-clone +++ b/script/public-inbox-clone @@ -62,5 +62,5 @@ }, 'PublicInbox::LeiMirror'; $? = 0; $mrr->do_mirror; -$mrr->can('_wq_done_wait')->([$mrr, $lei], $$); +$mrr->can('_wq_done_wait')->($$, $mrr, $lei); exit(($lei->{child_error} // 0) >> 8);