use File::Temp ();
use Fcntl qw(SEEK_SET O_CREAT O_EXCL O_WRONLY);
use Carp qw(croak);
+use POSIX qw(WNOHANG);
sub _wq_done_wait { # dwaitpid callback (via wq_eof)
my ($arg, $pid) = @_;
close $fh or die "close:($f): $!";
}
+sub reap_clone { # async, called via SIGCHLD
+ my ($lei, $cmd, $live) = @_;
+ my $cerr = $?;
+ $? = 0; # don't let it influence normal exit
+ if ($cerr) {
+ kill('TERM', keys %$live);
+ $lei->child_error($cerr, "@$cmd failed");
+ }
+}
+
sub clone_v2 ($$;$) {
my ($self, $v2_epochs, $m) = @_; # $m => manifest.js.gz hashref
my $lei = $self->{lei};
# filter out the epochs we skipped
$self->{-culled_manifest} = 1 if delete(@$m{@skip});
my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock';
- _try_config($self);
+ my $task = $m ? bless { %$self }, __PACKAGE__ : $self;
+
+ _try_config($task);
my $on_destroy = $lk->lock_for_scope($$);
my @cmd = clone_cmd($lei, my $opt = {});
- while (my ($src, $edst) = splice(@src_edst, 0, 2)) {
- my $cmd = [ @$pfx, @cmd, $src, $edst ];
- my $cerr = run_reap($lei, $cmd, $opt);
- return $lei->child_error($cerr, "@$cmd failed") if $cerr;
- }
+ my $jobs = $self->{lei}->{opt}->{jobs} // 2;
+ my %live;
+ my $sigchld = sub {
+ my ($sig) = @_;
+ my $flags = $sig ? WNOHANG : 0;
+ while (1) {
+ my $pid = waitpid(-1, $flags) or return;
+ return if $pid < 0;
+ if (my $x = delete $live{$pid}) {
+ my $cb = shift @$x;
+ $cb->(@$x, \%live);
+ } else {
+ warn "reaped unknown PID=$pid ($?)\n";
+ }
+ }
+ };
+ do {
+ $sigchld->(0) while keys(%live) >= $jobs;
+ local $SIG{CHLD} = $sigchld;
+ while (keys(%live) < $jobs && @src_edst &&
+ !$lei->{child_error}) {
+ my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ];
+ $lei->qerr("# @$cmd");
+ my $pid = spawn($cmd, undef, $opt);
+ $live{$pid} = [ \&reap_clone, $lei, $cmd ];
+ }
+ } while (@src_edst && !$lei->{child_error});
+ $sigchld->(0) while keys(%live);
+ return if $lei->{child_error};
+
require PublicInbox::MultiGit;
my $mg = PublicInbox::MultiGit->new($dst, 'all.git', 'git');
$mg->fill_alternates;
}
write_makefile($dst, 2);
undef $on_destroy; # unlock
- index_cloned_inbox($self, 2);
+ index_cloned_inbox($task, 2);
}
sub decode_manifest ($$$) {