From: Eric Wong Date: Mon, 28 Nov 2022 05:31:00 +0000 (+0000) Subject: clone: parallelize v2 epoch clones X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=commitdiff_plain;h=f6002395d2c62dfac2c6fbbd7e8428bb2467a036 clone: parallelize v2 epoch clones This is a first step in supporting completely parallelized clones. Eventually, everything will be parallelized and dependencies will be managed via callbacks. --- diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index d5017642..a8aa6a9f 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -14,6 +14,7 @@ use PublicInbox::Spawn qw(popen_rd spawn); use File::Temp (); use Fcntl qw(SEEK_SET O_CREAT O_EXCL O_WRONLY); use Carp qw(croak); +use POSIX qw(WNOHANG); sub _wq_done_wait { # dwaitpid callback (via wq_eof) my ($arg, $pid) = @_; @@ -283,6 +284,16 @@ EOM close $fh or die "close:($f): $!"; } +sub reap_clone { # async, called via SIGCHLD + my ($lei, $cmd, $live) = @_; + my $cerr = $?; + $? = 0; # don't let it influence normal exit + if ($cerr) { + kill('TERM', keys %$live); + $lei->child_error($cerr, "@$cmd failed"); + } +} + sub clone_v2 ($$;$) { my ($self, $v2_epochs, $m) = @_; # $m => manifest.js.gz hashref my $lei = $self->{lei}; @@ -312,14 +323,41 @@ failed to extract epoch number from $src # filter out the epochs we skipped $self->{-culled_manifest} = 1 if delete(@$m{@skip}); my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock'; - _try_config($self); + my $task = $m ? bless { %$self }, __PACKAGE__ : $self; + + _try_config($task); my $on_destroy = $lk->lock_for_scope($$); my @cmd = clone_cmd($lei, my $opt = {}); - while (my ($src, $edst) = splice(@src_edst, 0, 2)) { - my $cmd = [ @$pfx, @cmd, $src, $edst ]; - my $cerr = run_reap($lei, $cmd, $opt); - return $lei->child_error($cerr, "@$cmd failed") if $cerr; - } + my $jobs = $self->{lei}->{opt}->{jobs} // 2; + my %live; + my $sigchld = sub { + my ($sig) = @_; + my $flags = $sig ? WNOHANG : 0; + while (1) { + my $pid = waitpid(-1, $flags) or return; + return if $pid < 0; + if (my $x = delete $live{$pid}) { + my $cb = shift @$x; + $cb->(@$x, \%live); + } else { + warn "reaped unknown PID=$pid ($?)\n"; + } + } + }; + do { + $sigchld->(0) while keys(%live) >= $jobs; + local $SIG{CHLD} = $sigchld; + while (keys(%live) < $jobs && @src_edst && + !$lei->{child_error}) { + my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ]; + $lei->qerr("# @$cmd"); + my $pid = spawn($cmd, undef, $opt); + $live{$pid} = [ \&reap_clone, $lei, $cmd ]; + } + } while (@src_edst && !$lei->{child_error}); + $sigchld->(0) while keys(%live); + return if $lei->{child_error}; + require PublicInbox::MultiGit; my $mg = PublicInbox::MultiGit->new($dst, 'all.git', 'git'); $mg->fill_alternates; @@ -330,7 +368,7 @@ failed to extract epoch number from $src } write_makefile($dst, 2); undef $on_destroy; # unlock - index_cloned_inbox($self, 2); + index_cloned_inbox($task, 2); } sub decode_manifest ($$$) { diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index e793a001..888c1f1e 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -291,6 +291,7 @@ sub run_script ($;$$) { my ($cmd, $env, $opt) = @_; my ($key, @argv) = @$cmd; my $run_mode = $ENV{TEST_RUN_MODE} // $opt->{run_mode} // 1; + $run_mode = 0 if $key eq '-clone'; # relies on SIGCHLD + waitpid(-1) my $sub = $run_mode == 0 ? undef : key2sub($key); my $fhref = []; my $spawn_opt = {}; diff --git a/script/public-inbox-clone b/script/public-inbox-clone index 4244e0c8..ce4697f3 100755 --- a/script/public-inbox-clone +++ b/script/public-inbox-clone @@ -22,7 +22,7 @@ options: -C DIR chdir to specified directory EOF GetOptions($opt, qw(help|h quiet|q verbose|v+ C=s@ c=s@ include|I=s@ exclude=s@ - no-torsocks torsocks=s epoch=s)) or die $help; + jobs|j=i no-torsocks torsocks=s epoch=s)) or die $help; if ($opt->{help}) { print $help; exit }; require PublicInbox::Admin; # loads Config PublicInbox::Admin::do_chdir(delete $opt->{C});