]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ipc+lei: switch to awaitpid
authorEric Wong <e@80x24.org>
Tue, 17 Jan 2023 07:19:10 +0000 (07:19 +0000)
committerEric Wong <e@80x24.org>
Wed, 18 Jan 2023 23:26:03 +0000 (23:26 +0000)
This avoids awkwardly stuffing an arrayref into callbacks
which expect multiple arguments.  IPC->awaitpid_init now
allows pre-registering callbacks before spawning workers.

lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiConvert.pm
lib/PublicInbox/LeiInput.pm
lib/PublicInbox/LeiMirror.pm
lib/PublicInbox/LeiStore.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiUp.pm
lib/PublicInbox/LeiXSearch.pm
script/public-inbox-clone

index 34e401185e92906e9e3762e849c86783412c2496..edc5ba6486f12a888432d3828fdd4e7f354cff75 100644 (file)
@@ -12,7 +12,7 @@ use strict;
 use v5.10.1;
 use parent qw(Exporter);
 use Carp qw(croak);
-use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::DS qw(awaitpid);
 use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
@@ -133,26 +133,26 @@ sub ipc_worker_spawn {
        $self->{-ipc_req} = $w_req;
        $self->{-ipc_res} = $r_res;
        $self->{-ipc_ppid} = $$;
+       awaitpid($pid, \&ipc_worker_reap, $self);
        $self->{-ipc_pid} = $pid;
 }
 
-sub ipc_worker_reap { # dwaitpid callback
-       my ($args, $pid) = @_;
-       my ($self, @uargs) = @$args;
+sub ipc_worker_reap { # awaitpid callback
+       my ($pid, $self) = @_;
        delete $self->{-wq_workers}->{$pid};
-       return $self->{-reap_do}->($args, $pid) if $self->{-reap_do};
+       if (my $cb_args = $self->{-reap_do}) {
+               return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]);
+       }
        return if !$?;
        my $s = $? & 127;
        # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
        warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
 }
 
-sub wq_wait_async {
-       my ($self, $cb, @uargs) = @_;
-       local $PublicInbox::DS::in_loop = 1;
-       $self->{-reap_do} = $cb;
-       my @pids = keys %{$self->{-wq_workers}};
-       dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
+# register wait workers
+sub awaitpid_init {
+       my ($self, @cb_args) = @_;
+       $self->{-reap_do} = \@cb_args;
 }
 
 # for base class, override in sub classes
@@ -178,9 +178,7 @@ sub ipc_worker_stop {
        }
        die 'no PID with IPC pipes' unless $pid;
        $w_req = $r_res = undef;
-
-       return if $$ != $ppid;
-       dwaitpid($pid, \&ipc_worker_reap, [$self]);
+       awaitpid($pid) if $$ == $ppid; # for non-event loop
 }
 
 # use this if we have multiple readers reading curl or "pigz -dc"
@@ -397,6 +395,7 @@ sub _wq_worker_start ($$$$) {
                undef $end; # trigger exit
        } else {
                $self->{-wq_workers}->{$pid} = $bcast1;
+               awaitpid($pid, \&ipc_worker_reap, $self);
        }
 }
 
@@ -428,8 +427,7 @@ sub wq_close {
        }
        delete @$self{qw(-wq_s1 -wq_s2)} or return;
        return if $self->{-reap_do};
-       my @pids = keys %{$self->{-wq_workers}};
-       dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
+       awaitpid($_) for keys %{$self->{-wq_workers}};
 }
 
 sub wq_kill {
index b78d70de95f4e1ef7d616fd33d55f58d62e43d0c..6ad421111db6bf74e157f9e1816179a6a3ab8bc6 100644 (file)
@@ -18,7 +18,6 @@ use IO::Handle ();
 use Fcntl qw(SEEK_SET);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::Lock;
 use PublicInbox::Eml;
@@ -644,12 +643,12 @@ sub workers_start {
        my $end = $lei->pkt_op_pair;
        my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
        $flds->{lei} = $lei;
+       $wq->awaitpid_init($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
        delete $lei->{pkt_op_p};
        my $op_c = delete $lei->{pkt_op_c};
        @$end = ();
        $lei->event_step_init;
-       $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
        ($op_c, $ops);
 }
 
@@ -1391,9 +1390,8 @@ sub DESTROY {
        # preserve $? for ->fail or ->x_it code
 }
 
-sub wq_done_wait { # dwaitpid callback
-       my ($arg, $pid) = @_;
-       my ($wq, $lei) = @$arg;
+sub wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+       my ($pid, $wq, $lei) = @_;
        local $current_lei = $lei;
        my $err_type = $lei->{-err_type};
        $? and $lei->child_error($?,
index 59af40dea1b11a64ca8e074412c978ab1e98020d..1acd4558ff0b89cc2ac01afd07638bccf0c5af41 100644 (file)
@@ -30,7 +30,7 @@ sub input_maildir_cb {
 
 sub process_inputs { # via wq_do
        my ($self) = @_;
-       local $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid
+       local $PublicInbox::DS::in_loop = 0; # force synchronous awaitpid
        $self->SUPER::process_inputs;
        my $lei = $self->{lei};
        delete $lei->{1};
index a1dcc907252f47d7716a83e799e2912a34be2416..c258f824e8325101c80e55362ea6368b27434358 100644 (file)
@@ -177,7 +177,7 @@ sub input_path_url {
                        $mbl->{fh} =
                             PublicInbox::MboxReader::zsfxcat($in, $zsfx, $lei);
                }
-               local $PublicInbox::DS::in_loop = 0 if $zsfx; # dwaitpid
+               local $PublicInbox::DS::in_loop = 0 if $zsfx; # awaitpid
                $self->input_fh($ifmt, $mbl->{fh}, $input, @args);
        } elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
                return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
index 87abf88c8e1a03f4559b7d0f0251019afb52b7f5..abf663159855e3818f1e484d6d7ffe7060680102 100644 (file)
@@ -31,9 +31,8 @@ sub keep_going ($) {
                $_[0]->{lei}->{opt}->{'keep-going'});
 }
 
-sub _wq_done_wait { # dwaitpid callback (via wq_eof)
-       my ($arg, $pid) = @_;
-       my ($mrr, $lei) = @$arg;
+sub _wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+       my ($pid, $mrr, $lei) = @_;
        if ($?) {
                $lei->child_error($?);
        } elsif (!$lei->{child_error}) {
@@ -236,7 +235,7 @@ sub index_cloned_inbox {
                        my ($k) = ($sw =~ /\A([\w-]+)/);
                        $opt->{$k} = $lei->{opt}->{$k};
                }
-               # force synchronous dwaitpid for v2:
+               # force synchronous awaitpid for v2:
                local $PublicInbox::DS::in_loop = 0;
                my $cfg = PublicInbox::Config->new(undef, $lei->{2});
                my $env = PublicInbox::Admin::index_prepare($opt, $cfg);
index 57f0e0132f4316fa3a97f0b1dc56b99eaa8a3969..0ecf138876e4faa4e88d5615790b77a6aa331ad5 100644 (file)
@@ -604,9 +604,8 @@ sub recv_and_run {
        $self->SUPER::recv_and_run(@args);
 }
 
-sub _sto_atexit { # dwaitpid callback
-       my ($args, $pid) = @_;
-       my $self = $args->[0];
+sub _sto_atexit { # awaitpid cb (via awaitpid_init)
+       my ($pid, $sto) = @_;
        warn "lei/store PID:$pid died \$?=$?\n" if $?;
 }
 
@@ -621,12 +620,12 @@ sub write_prepare {
                # Mail we import into lei are private, so headers filtered out
                # by -mda for public mail are not appropriate
                local @PublicInbox::MDA::BAD_HEADERS = ();
+               $self->awaitpid_init(\&_sto_atexit); # outlives $lei
                $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
                                        lei => $lei,
                                        -err_wr => $w,
                                        to_close => [ $r ],
                                });
-               $self->wq_wait_async(\&_sto_atexit); # outlives $lei
                require PublicInbox::LeiStoreErr;
                PublicInbox::LeiStoreErr->new($r, $lei);
        }
index 1528165a7e00f9571332062c27934c09058b2418..6a4554e76d323e333af5ab128741fca5c649f913 100644 (file)
@@ -652,9 +652,8 @@ sub _do_augment_mbox {
        $dedupe->pause_dedupe if $dedupe;
 }
 
-sub v2w_done_wait { # dwaitpid callback
-       my ($arg, $pid) = @_;
-       my ($v2w, $lei) = @$arg;
+sub v2w_done_wait { # awaitpid cb (via awaitpid_init)
+       my ($pid, $v2w, $lei) = @_;
        $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
 }
 
@@ -680,8 +679,8 @@ sub _pre_augment_v2 {
        PublicInbox::InboxWritable->new($ibx, @creat);
        $ibx->init_inbox if @creat;
        my $v2w = $ibx->importer;
+       $v2w->awaitpid_init(\&v2w_done_wait, $lei);
        $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
-       $v2w->wq_wait_async(\&v2w_done_wait, $lei);
        $lei->{v2w} = $v2w;
        return if !$lei->{opt}->{shared};
        my $d = "$lei->{ale}->{git}->{git_dir}/objects";
index 499173393ca6ff3f43a017417945f8d993618c82..3e92242ec044039aac8f1b91664c85e85cdf6394 100644 (file)
@@ -165,9 +165,8 @@ sub _complete_up { # lei__complete hook
        map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
 }
 
-sub _wq_done_wait { # dwaitpid callback
-       my ($arg, $pid) = @_;
-       my ($wq, $lei) = @$arg;
+sub _wq_done_wait { # awaitpid cb (via awaitpid_init)
+       my ($pid, $wq, $lei) = @_;
        $lei->child_error($?, 'auth failure') if $?
 }
 
index 730df1f772f12da20821b3546e4e459b57419689..f9aa870e0346247acbde426b7bda894105a38725 100644 (file)
@@ -400,9 +400,8 @@ sub query_remote_mboxrd {
 
 sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
 
-sub xsearch_done_wait { # dwaitpid callback
-       my ($arg, $pid) = @_;
-       my ($wq, $lei) = @$arg;
+sub xsearch_done_wait { # awaitpid cb (via awaitpid_init)
+       my ($pid, $wq, $lei) = @_;
        return if !$?;
        my $s = $? & 127;
        return $lei->child_error($?) if $s == 13 || $s == 15;
@@ -573,16 +572,16 @@ sub do_query {
                        fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                        $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
                }
+               $l2m->awaitpid_init(\&xsearch_done_wait, $lei);
                $l2m->wq_workers_start('lei2mail', undef,
                                        $lei->oldset, { lei => $lei });
-               $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
                pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
                fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                delete $l2m->{au_peers};
        }
+       $self->awaitpid_init(\&xsearch_done_wait, $lei);
        $self->wq_workers_start('lei_xsearch', undef,
                                $lei->oldset, { lei => $lei });
-       $self->wq_wait_async(\&xsearch_done_wait, $lei);
        my $op_c = delete $lei->{pkt_op_c};
        delete $lei->{pkt_op_p};
        @$end = ();
index e93ac37b2fb9ba2433678d574d6a925b62751823..598979bc1a467a0e308e1278a98e6e93f154bf36 100755 (executable)
@@ -62,5 +62,5 @@ my $mrr = bless {
 
 $? = 0;
 $mrr->do_mirror;
-$mrr->can('_wq_done_wait')->([$mrr, $lei], $$);
+$mrr->can('_wq_done_wait')->($$, $mrr, $lei);
 exit(($lei->{child_error} // 0) >> 8);