]> Sergey Matveev's repositories - public-inbox.git/commitdiff
clone: parallelize v2 epoch clones
authorEric Wong <e@80x24.org>
Mon, 28 Nov 2022 05:31:00 +0000 (05:31 +0000)
committerEric Wong <e@80x24.org>
Mon, 28 Nov 2022 23:38:53 +0000 (23:38 +0000)
This is a first step in supporting completely parallelized
clones.  Eventually, everything will be parallelized and
dependencies will be managed via callbacks.

lib/PublicInbox/LeiMirror.pm
lib/PublicInbox/TestCommon.pm
script/public-inbox-clone

index d501764213c42a4fadf5b873128cc943bd7ec4b7..a8aa6a9f0654d7a2d2257c67a3c1c1eb9dfe1d1e 100644 (file)
@@ -14,6 +14,7 @@ use PublicInbox::Spawn qw(popen_rd spawn);
 use File::Temp ();
 use Fcntl qw(SEEK_SET O_CREAT O_EXCL O_WRONLY);
 use Carp qw(croak);
+use POSIX qw(WNOHANG);
 
 sub _wq_done_wait { # dwaitpid callback (via wq_eof)
        my ($arg, $pid) = @_;
@@ -283,6 +284,16 @@ EOM
        close $fh or die "close:($f): $!";
 }
 
+sub reap_clone { # async, called via SIGCHLD
+       my ($lei, $cmd, $live) = @_;
+       my $cerr = $?;
+       $? = 0; # don't let it influence normal exit
+       if ($cerr) {
+               kill('TERM', keys %$live);
+               $lei->child_error($cerr, "@$cmd failed");
+       }
+}
+
 sub clone_v2 ($$;$) {
        my ($self, $v2_epochs, $m) = @_; # $m => manifest.js.gz hashref
        my $lei = $self->{lei};
@@ -312,14 +323,41 @@ failed to extract epoch number from $src
        # filter out the epochs we skipped
        $self->{-culled_manifest} = 1 if delete(@$m{@skip});
        my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock';
-       _try_config($self);
+       my $task = $m ? bless { %$self }, __PACKAGE__ : $self;
+
+       _try_config($task);
        my $on_destroy = $lk->lock_for_scope($$);
        my @cmd = clone_cmd($lei, my $opt = {});
-       while (my ($src, $edst) = splice(@src_edst, 0, 2)) {
-               my $cmd = [ @$pfx, @cmd, $src, $edst ];
-               my $cerr = run_reap($lei, $cmd, $opt);
-               return $lei->child_error($cerr, "@$cmd failed") if $cerr;
-       }
+       my $jobs = $self->{lei}->{opt}->{jobs} // 2;
+       my %live;
+       my $sigchld = sub {
+               my ($sig) = @_;
+               my $flags = $sig ? WNOHANG : 0;
+               while (1) {
+                       my $pid = waitpid(-1, $flags) or return;
+                       return if $pid < 0;
+                       if (my $x = delete $live{$pid}) {
+                               my $cb = shift @$x;
+                               $cb->(@$x, \%live);
+                       } else {
+                               warn "reaped unknown PID=$pid ($?)\n";
+                       }
+               }
+       };
+       do {
+               $sigchld->(0) while keys(%live) >= $jobs;
+               local $SIG{CHLD} = $sigchld;
+               while (keys(%live) < $jobs && @src_edst &&
+                               !$lei->{child_error}) {
+                       my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ];
+                       $lei->qerr("# @$cmd");
+                       my $pid = spawn($cmd, undef, $opt);
+                       $live{$pid} = [ \&reap_clone, $lei, $cmd ];
+               }
+       } while (@src_edst && !$lei->{child_error});
+       $sigchld->(0) while keys(%live);
+       return if $lei->{child_error};
+
        require PublicInbox::MultiGit;
        my $mg = PublicInbox::MultiGit->new($dst, 'all.git', 'git');
        $mg->fill_alternates;
@@ -330,7 +368,7 @@ failed to extract epoch number from $src
        }
        write_makefile($dst, 2);
        undef $on_destroy; # unlock
-       index_cloned_inbox($self, 2);
+       index_cloned_inbox($task, 2);
 }
 
 sub decode_manifest ($$$) {
index e793a001e20e94a4cdce121155fd12bfaff9a84c..888c1f1efbd44f6d8d191bd5720889dc0c8abfec 100644 (file)
@@ -291,6 +291,7 @@ sub run_script ($;$$) {
        my ($cmd, $env, $opt) = @_;
        my ($key, @argv) = @$cmd;
        my $run_mode = $ENV{TEST_RUN_MODE} // $opt->{run_mode} // 1;
+       $run_mode = 0 if $key eq '-clone'; # relies on SIGCHLD + waitpid(-1)
        my $sub = $run_mode == 0 ? undef : key2sub($key);
        my $fhref = [];
        my $spawn_opt = {};
index 4244e0c8cc688a9fbb3e8f9d224f3575895a0547..ce4697f3917edd655a925561b71e11ea08c2664f 100755 (executable)
@@ -22,7 +22,7 @@ options:
     -C DIR            chdir to specified directory
 EOF
 GetOptions($opt, qw(help|h quiet|q verbose|v+ C=s@ c=s@ include|I=s@ exclude=s@
-               no-torsocks torsocks=s epoch=s)) or die $help;
+               jobs|j=i no-torsocks torsocks=s epoch=s)) or die $help;
 if ($opt->{help}) { print $help; exit };
 require PublicInbox::Admin; # loads Config
 PublicInbox::Admin::do_chdir(delete $opt->{C});