We no longer rely on SIGCHLD for predictability, and instead
call waitpid at safe points. This will make it easier for us to
do parallel mirroring of multiple inboxes while preserving
proper dependencies via ->DESTROY callbacks.
use File::Temp ();
use Fcntl qw(SEEK_SET O_CREAT O_EXCL O_WRONLY);
use Carp qw(croak);
use File::Temp ();
use Fcntl qw(SEEK_SET O_CREAT O_EXCL O_WRONLY);
use Carp qw(croak);
sub _wq_done_wait { # dwaitpid callback (via wq_eof)
my ($arg, $pid) = @_;
sub _wq_done_wait { # dwaitpid callback (via wq_eof)
my ($arg, $pid) = @_;
my ($n) = (m!/([0-9]+)\z!);
$n => [ URI->new($_), '' ]
} @v2_urls; # uniq
my ($n) = (m!/([0-9]+)\z!);
$n => [ URI->new($_), '' ]
} @v2_urls; # uniq
- return clone_v2($self, \%v2_epochs);
+ clone_v2($self, \%v2_epochs);
+ reap_live() while keys(%LIVE);
+ return;
}
# filter out common URLs served by WWW (e.g /$MSGID/T/)
}
# filter out common URLs served by WWW (e.g /$MSGID/T/)
}
sub reap_clone { # async, called via SIGCHLD
}
sub reap_clone { # async, called via SIGCHLD
- my ($lei, $cmd, $live) = @_;
my $cerr = $?;
$? = 0; # don't let it influence normal exit
if ($cerr) {
my $cerr = $?;
$? = 0; # don't let it influence normal exit
if ($cerr) {
- kill('TERM', keys %$live);
+ kill('TERM', keys %LIVE);
$lei->child_error($cerr, "@$cmd failed");
}
}
$lei->child_error($cerr, "@$cmd failed");
}
}
+sub v2_done { # called via OnDestroy
my ($self) = @_;
require PublicInbox::MultiGit;
my $dst = $self->{cur_dst} // $self->{dst};
my ($self) = @_;
require PublicInbox::MultiGit;
my $dst = $self->{cur_dst} // $self->{dst};
index_cloned_inbox($self, 2);
}
index_cloned_inbox($self, 2);
}
+sub reap_live {
+ my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
+ if (my $x = delete $LIVE{$pid}) {
+ my $cb = shift @$x;
+ $cb->(@$x);
+ } else {
+ warn "reaped unknown PID=$pid ($?)\n";
+ }
+}
+
sub clone_v2 ($$;$) {
my ($self, $v2_epochs, $m) = @_; # $m => manifest.js.gz hashref
my $lei = $self->{lei};
sub clone_v2 ($$;$) {
my ($self, $v2_epochs, $m) = @_; # $m => manifest.js.gz hashref
my $lei = $self->{lei};
# filter out the epochs we skipped
$self->{-culled_manifest} = 1 if delete(@$m{@skip});
my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock';
# filter out the epochs we skipped
$self->{-culled_manifest} = 1 if delete(@$m{@skip});
my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock';
my $fini = PublicInbox::OnDestroy->new($$, \&v2_done, $task);
my $fini = PublicInbox::OnDestroy->new($$, \&v2_done, $task);
- $live{_try_config_start($task)} = [ \&_try_config_done, $task, $fini ];
+ $LIVE{_try_config_start($task)} = [ \&_try_config_done, $task, $fini ];
$task->{-locked} = $lk->lock_for_scope($$);
my @cmd = clone_cmd($lei, my $opt = {});
my $jobs = $self->{lei}->{opt}->{jobs} // 2;
$task->{-locked} = $lk->lock_for_scope($$);
my @cmd = clone_cmd($lei, my $opt = {});
my $jobs = $self->{lei}->{opt}->{jobs} // 2;
- my $sigchld = sub {
- my ($sig) = @_;
- my $flags = $sig ? WNOHANG : 0;
- while (1) {
- my $pid = waitpid(-1, $flags) or return;
- return if $pid < 0;
- if (my $x = delete $live{$pid}) {
- my $cb = shift @$x;
- $cb->(@$x, \%live);
- } else {
- warn "reaped unknown PID=$pid ($?)\n";
- }
- }
- };
- $sigchld->(0) while keys(%live) >= $jobs;
- while (keys(%live) < $jobs && @src_edst &&
+ reap_live() while keys(%LIVE) >= $jobs;
+ while (keys(%LIVE) < $jobs && @src_edst &&
!$lei->{child_error}) {
my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ];
$lei->qerr("# @$cmd");
!$lei->{child_error}) {
my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ];
$lei->qerr("# @$cmd");
- my $pid = spawn($cmd, undef, $opt);
- $live{$pid} = [ \&reap_clone, $lei, $cmd, $fini ];
+ $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone,
+ $lei, $cmd, $fini ];
}
} while (@src_edst && !$lei->{child_error});
}
} while (@src_edst && !$lei->{child_error});
- $sigchld->(0) while keys(%live);
}
sub decode_manifest ($$$) {
}
sub decode_manifest ($$$) {
my $opt = { -C => $pdir };
$opt->{$_} = $lei->{$_} for (0..2);
my $cerr = run_reap($lei, $cmd, $opt);
my $opt = { -C => $pdir };
$opt->{$_} = $lei->{$_} for (0..2);
my $cerr = run_reap($lei, $cmd, $opt);
if ($cerr) {
return try_scrape($self) if ($cerr >> 8) == 22; # 404 missing
return $lei->child_error($cerr, "@$cmd failed");
if ($cerr) {
return try_scrape($self) if ($cerr >> 8) == 22; # 404 missing
return $lei->child_error($cerr, "@$cmd failed");
}
my ($path_pfx, $n, $multi) = multi_inbox($self, \$path, $m);
return $lei->child_error(1, $multi) if !ref($multi);
}
my ($path_pfx, $n, $multi) = multi_inbox($self, \$path, $m);
return $lei->child_error(1, $multi) if !ref($multi);
+ my $jobs = $self->{lei}->{opt}->{jobs} // 2;
if (my $v2 = delete $multi->{v2}) {
for my $name (sort keys %$v2) {
my $epochs = delete $v2->{$name};
if (my $v2 = delete $multi->{v2}) {
for my $name (sort keys %$v2) {
my $epochs = delete $v2->{$name};
E: `$self->{cur_dst}' must not contain newline
EOM
clone_v2($self, \%v2_epochs, $m);
E: `$self->{cur_dst}' must not contain newline
EOM
clone_v2($self, \%v2_epochs, $m);
+ reap_live() while keys(%LIVE) >= $jobs;
+ return if $self->{lei}->{child_error};
}
}
if (my $v1 = delete $multi->{v1}) {
}
}
if (my $v1 = delete $multi->{v1}) {
+ reap_live() while keys(%LIVE);
if (delete $self->{-culled_manifest}) { # set by clone_v2/-I/--exclude
# write the smaller manifest if epochs were skipped so
# users won't have to delete manifest if they +w an
if (delete $self->{-culled_manifest}) { # set by clone_v2/-I/--exclude
# write the smaller manifest if epochs were skipped so
# users won't have to delete manifest if they +w an
eval {
my $iv = $lei->{opt}->{'inbox-version'};
if (defined $iv) {
eval {
my $iv = $lei->{opt}->{'inbox-version'};
if (defined $iv) {
return clone_v1($self) if $iv == 1;
return try_scrape($self) if $iv == 2;
die "bad --inbox-version=$iv\n";
return clone_v1($self) if $iv == 1;
return try_scrape($self) if $iv == 2;
die "bad --inbox-version=$iv\n";