use File::Temp ();
use Fcntl qw(SEEK_SET O_CREAT O_EXCL O_WRONLY);
use Carp qw(croak);
-use POSIX qw(WNOHANG);
+our %LIVE;
sub _wq_done_wait { # dwaitpid callback (via wq_eof)
my ($arg, $pid) = @_;
my ($n) = (m!/([0-9]+)\z!);
$n => [ URI->new($_), '' ]
} @v2_urls; # uniq
- return clone_v2($self, \%v2_epochs);
+ clone_v2($self, \%v2_epochs);
+ reap_live() while keys(%LIVE);
+ return;
}
# filter out common URLs served by WWW (e.g /$MSGID/T/)
}
sub reap_clone { # async, called via SIGCHLD
- my ($lei, $cmd, $live) = @_;
+ my ($lei, $cmd) = @_;
my $cerr = $?;
$? = 0; # don't let it influence normal exit
if ($cerr) {
- kill('TERM', keys %$live);
+ kill('TERM', keys %LIVE);
$lei->child_error($cerr, "@$cmd failed");
}
}
-sub v2_done {
+sub v2_done { # called via OnDestroy
my ($self) = @_;
require PublicInbox::MultiGit;
my $dst = $self->{cur_dst} // $self->{dst};
index_cloned_inbox($self, 2);
}
+sub reap_live {
+ my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
+ if (my $x = delete $LIVE{$pid}) {
+ my $cb = shift @$x;
+ $cb->(@$x);
+ } else {
+ warn "reaped unknown PID=$pid ($?)\n";
+ }
+}
+
sub clone_v2 ($$;$) {
my ($self, $v2_epochs, $m) = @_; # $m => manifest.js.gz hashref
my $lei = $self->{lei};
# filter out the epochs we skipped
$self->{-culled_manifest} = 1 if delete(@$m{@skip});
my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock';
- my %live;
my $fini = PublicInbox::OnDestroy->new($$, \&v2_done, $task);
- $live{_try_config_start($task)} = [ \&_try_config_done, $task, $fini ];
+ $LIVE{_try_config_start($task)} = [ \&_try_config_done, $task, $fini ];
$task->{-locked} = $lk->lock_for_scope($$);
my @cmd = clone_cmd($lei, my $opt = {});
my $jobs = $self->{lei}->{opt}->{jobs} // 2;
- my $sigchld = sub {
- my ($sig) = @_;
- my $flags = $sig ? WNOHANG : 0;
- while (1) {
- my $pid = waitpid(-1, $flags) or return;
- return if $pid < 0;
- if (my $x = delete $live{$pid}) {
- my $cb = shift @$x;
- $cb->(@$x, \%live);
- } else {
- warn "reaped unknown PID=$pid ($?)\n";
- }
- }
- };
do {
- $sigchld->(0) while keys(%live) >= $jobs;
- while (keys(%live) < $jobs && @src_edst &&
+ reap_live() while keys(%LIVE) >= $jobs;
+ while (keys(%LIVE) < $jobs && @src_edst &&
!$lei->{child_error}) {
my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ];
$lei->qerr("# @$cmd");
- my $pid = spawn($cmd, undef, $opt);
- $live{$pid} = [ \&reap_clone, $lei, $cmd, $fini ];
+ $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone,
+ $lei, $cmd, $fini ];
}
} while (@src_edst && !$lei->{child_error});
- $sigchld->(0) while keys(%live);
}
sub decode_manifest ($$$) {
my $opt = { -C => $pdir };
$opt->{$_} = $lei->{$_} for (0..2);
my $cerr = run_reap($lei, $cmd, $opt);
+ local %LIVE;
if ($cerr) {
return try_scrape($self) if ($cerr >> 8) == 22; # 404 missing
return $lei->child_error($cerr, "@$cmd failed");
}
my ($path_pfx, $n, $multi) = multi_inbox($self, \$path, $m);
return $lei->child_error(1, $multi) if !ref($multi);
+ my $jobs = $self->{lei}->{opt}->{jobs} // 2;
if (my $v2 = delete $multi->{v2}) {
for my $name (sort keys %$v2) {
my $epochs = delete $v2->{$name};
E: `$self->{cur_dst}' must not contain newline
EOM
clone_v2($self, \%v2_epochs, $m);
+ reap_live() while keys(%LIVE) >= $jobs;
+ return if $self->{lei}->{child_error};
}
}
if (my $v1 = delete $multi->{v1}) {
clone_v1($self, 1);
}
}
+ reap_live() while keys(%LIVE);
if (delete $self->{-culled_manifest}) { # set by clone_v2/-I/--exclude
# write the smaller manifest if epochs were skipped so
# users won't have to delete manifest if they +w an
eval {
my $iv = $lei->{opt}->{'inbox-version'};
if (defined $iv) {
+ local %LIVE;
return clone_v1($self) if $iv == 1;
return try_scrape($self) if $iv == 2;
die "bad --inbox-version=$iv\n";