]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei_mirror: rely on global process reaper
authorEric Wong <e@80x24.org>
Mon, 28 Nov 2022 05:31:03 +0000 (05:31 +0000)
committerEric Wong <e@80x24.org>
Mon, 28 Nov 2022 23:38:53 +0000 (23:38 +0000)
We no longer rely on SIGCHLD for predictability, and instead
call waitpid at safe points.  This will make it easier for us to
do parallel mirroring of multiple inboxes while preserving
proper dependencies via ->DESTROY callbacks.

lib/PublicInbox/LeiMirror.pm

index 0603dd4839d4d897b50c5db44d09c8a62357432a..7dc47ab812404d78a8bdfb27af39de154191cddb 100644 (file)
@@ -14,7 +14,7 @@ use PublicInbox::Spawn qw(popen_rd spawn);
 use File::Temp ();
 use Fcntl qw(SEEK_SET O_CREAT O_EXCL O_WRONLY);
 use Carp qw(croak);
-use POSIX qw(WNOHANG);
+our %LIVE;
 
 sub _wq_done_wait { # dwaitpid callback (via wq_eof)
        my ($arg, $pid) = @_;
@@ -61,7 +61,9 @@ sub try_scrape {
                        my ($n) = (m!/([0-9]+)\z!);
                        $n => [ URI->new($_), '' ]
                } @v2_urls; # uniq
-               return clone_v2($self, \%v2_epochs);
+               clone_v2($self, \%v2_epochs);
+               reap_live() while keys(%LIVE);
+               return;
        }
 
        # filter out common URLs served by WWW (e.g /$MSGID/T/)
@@ -311,16 +313,16 @@ EOM
 }
 
 sub reap_clone { # async, called via SIGCHLD
-       my ($lei, $cmd, $live) = @_;
+       my ($lei, $cmd) = @_;
        my $cerr = $?;
        $? = 0; # don't let it influence normal exit
        if ($cerr) {
-               kill('TERM', keys %$live);
+               kill('TERM', keys %LIVE);
                $lei->child_error($cerr, "@$cmd failed");
        }
 }
 
-sub v2_done {
+sub v2_done { # called via OnDestroy
        my ($self) = @_;
        require PublicInbox::MultiGit;
        my $dst = $self->{cur_dst} // $self->{dst};
@@ -336,6 +338,16 @@ sub v2_done {
        index_cloned_inbox($self, 2);
 }
 
+sub reap_live {
+       my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
+       if (my $x = delete $LIVE{$pid}) {
+               my $cb = shift @$x;
+               $cb->(@$x);
+       } else {
+               warn "reaped unknown PID=$pid ($?)\n";
+       }
+}
+
 sub clone_v2 ($$;$) {
        my ($self, $v2_epochs, $m) = @_; # $m => manifest.js.gz hashref
        my $lei = $self->{lei};
@@ -366,37 +378,21 @@ failed to extract epoch number from $src
        # filter out the epochs we skipped
        $self->{-culled_manifest} = 1 if delete(@$m{@skip});
        my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock';
-       my %live;
        my $fini = PublicInbox::OnDestroy->new($$, \&v2_done, $task);
-       $live{_try_config_start($task)} = [ \&_try_config_done, $task, $fini ];
+       $LIVE{_try_config_start($task)} = [ \&_try_config_done, $task, $fini ];
        $task->{-locked} = $lk->lock_for_scope($$);
        my @cmd = clone_cmd($lei, my $opt = {});
        my $jobs = $self->{lei}->{opt}->{jobs} // 2;
-       my $sigchld = sub {
-               my ($sig) = @_;
-               my $flags = $sig ? WNOHANG : 0;
-               while (1) {
-                       my $pid = waitpid(-1, $flags) or return;
-                       return if $pid < 0;
-                       if (my $x = delete $live{$pid}) {
-                               my $cb = shift @$x;
-                               $cb->(@$x, \%live);
-                       } else {
-                               warn "reaped unknown PID=$pid ($?)\n";
-                       }
-               }
-       };
        do {
-               $sigchld->(0) while keys(%live) >= $jobs;
-               while (keys(%live) < $jobs && @src_edst &&
+               reap_live() while keys(%LIVE) >= $jobs;
+               while (keys(%LIVE) < $jobs && @src_edst &&
                                !$lei->{child_error}) {
                        my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ];
                        $lei->qerr("# @$cmd");
-                       my $pid = spawn($cmd, undef, $opt);
-                       $live{$pid} = [ \&reap_clone, $lei, $cmd, $fini ];
+                       $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone,
+                                                       $lei, $cmd, $fini ];
                }
        } while (@src_edst && !$lei->{child_error});
-       $sigchld->(0) while keys(%live);
 }
 
 sub decode_manifest ($$$) {
@@ -487,6 +483,7 @@ sub try_manifest {
        my $opt = { -C => $pdir };
        $opt->{$_} = $lei->{$_} for (0..2);
        my $cerr = run_reap($lei, $cmd, $opt);
+       local %LIVE;
        if ($cerr) {
                return try_scrape($self) if ($cerr >> 8) == 22; # 404 missing
                return $lei->child_error($cerr, "@$cmd failed");
@@ -498,6 +495,7 @@ sub try_manifest {
        }
        my ($path_pfx, $n, $multi) = multi_inbox($self, \$path, $m);
        return $lei->child_error(1, $multi) if !ref($multi);
+       my $jobs = $self->{lei}->{opt}->{jobs} // 2;
        if (my $v2 = delete $multi->{v2}) {
                for my $name (sort keys %$v2) {
                        my $epochs = delete $v2->{$name};
@@ -520,6 +518,8 @@ sub try_manifest {
 E: `$self->{cur_dst}' must not contain newline
 EOM
                        clone_v2($self, \%v2_epochs, $m);
+                       reap_live() while keys(%LIVE) >= $jobs;
+                       return if $self->{lei}->{child_error};
                }
        }
        if (my $v1 = delete $multi->{v1}) {
@@ -540,6 +540,7 @@ EOM
                        clone_v1($self, 1);
                }
        }
+       reap_live() while keys(%LIVE);
        if (delete $self->{-culled_manifest}) { # set by clone_v2/-I/--exclude
                # write the smaller manifest if epochs were skipped so
                # users won't have to delete manifest if they +w an
@@ -566,6 +567,7 @@ sub do_mirror { # via wq_io_do
        eval {
                my $iv = $lei->{opt}->{'inbox-version'};
                if (defined $iv) {
+                       local %LIVE;
                        return clone_v1($self) if $iv == 1;
                        return try_scrape($self) if $iv == 2;
                        die "bad --inbox-version=$iv\n";