From: Eric Wong Date: Mon, 28 Nov 2022 05:31:13 +0000 (+0000) Subject: lei_mirror: consolidate clone process management X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=fe0bf750fa0ef00cad3e16cbffab1b359322a510;p=public-inbox.git lei_mirror: consolidate clone process management This simplifies our code by having fewer places check process limits and perform reaping. We'll also print command names immediately before executing, instead of right before waiting for running processes. --- diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index eaffb8fa..8796951d 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -118,9 +118,9 @@ sub _get_txt_start { # non-fatal my $cmd = $self->{curl}->for_uri($lei, $uri, qw(--compressed -R -o), $ft->filename); $self->{"-get_txt.$endpoint"} = [ $ft, $cmd, $uri ]; - $lei->qerr("# @$cmd"); my $jobs = $lei->{opt}->{jobs} // 1; reap_live() while keys(%LIVE) >= $jobs; + $lei->qerr("# @$cmd"); $LIVE{spawn($cmd, undef, $opt)} = [ \&_get_txt_done, $self, $endpoint, $fini ]; } @@ -223,6 +223,14 @@ sub run_reap { $ret; } +sub start_clone { + my ($self, $cmd, $opt, $fini) = @_; + my $jobs = $self->{lei}->{opt}->{jobs} // 1; + reap_live() while keys(%LIVE) >= $jobs; + $self->{lei}->qerr("# @$cmd"); + $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone, $self, $cmd, $fini ]; +} + sub clone_v1 { my ($self, $nohang) = @_; my $lei = $self->{lei}; @@ -233,11 +241,8 @@ sub clone_v1 { my $pfx = $curl->torsocks($lei, $uri) or return; my $dst = $self->{cur_dst} // $self->{dst}; my $fini = PublicInbox::OnDestroy->new($$, \&v1_done, $self); - my $jobs = $self->{lei}->{opt}->{jobs} // 1; my $cmd = [ @$pfx, clone_cmd($lei, my $opt = {}), "$uri", $dst ]; - $lei->qerr("# @$cmd"); - $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone, $lei, $cmd, $fini ]; - reap_live() while keys(%LIVE) >= $jobs; + start_clone($self, $cmd, $opt, $fini); _get_txt_start($self, '_/text/config/raw', $fini); my $d = $self->{-ent} ? $self->{-ent}->{description} : undef; @@ -309,12 +314,12 @@ EOM } sub reap_clone { # async, called via SIGCHLD - my ($lei, $cmd) = @_; + my ($self, $cmd) = @_; my $cerr = $?; $? = 0; # don't let it influence normal exit if ($cerr) { kill('TERM', keys %LIVE); - $lei->child_error($cerr, "@$cmd failed"); + $self->{lei}->child_error($cerr, "@$cmd failed"); } } @@ -395,17 +400,10 @@ failed to extract epoch number from $src $task->{-locked} = $lk->lock_for_scope($$); my @cmd = clone_cmd($lei, my $opt = {}); - my $jobs = $self->{lei}->{opt}->{jobs} // 1; - do { - reap_live() while keys(%LIVE) >= $jobs; - while (keys(%LIVE) < $jobs && @src_edst && - !$lei->{child_error}) { - my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ]; - $lei->qerr("# @$cmd"); - $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone, - $lei, $cmd, $fini ]; - } - } while (@src_edst && !$lei->{child_error}); + while (@src_edst && !$lei->{child_error}) { + my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ]; + start_clone($self, $cmd, $opt, $fini); + } } sub decode_manifest ($$$) { @@ -508,7 +506,6 @@ sub try_manifest { } my ($path_pfx, $n, $multi) = multi_inbox($self, \$path, $m); return $lei->child_error(1, $multi) if !ref($multi); - my $jobs = $self->{lei}->{opt}->{jobs} // 1; if (my $v2 = delete $multi->{v2}) { for my $name (sort keys %$v2) { my $epochs = delete $v2->{$name}; @@ -531,7 +528,6 @@ sub try_manifest { E: `$self->{cur_dst}' must not contain newline EOM clone_v2($self, \%v2_epochs, $m); - reap_live() while keys(%LIVE) >= $jobs; return if $self->{lei}->{child_error}; } } @@ -540,7 +536,6 @@ EOM chop($p) if substr($p, -1, 1) eq '/'; $uri->path($p); for my $name (@$v1) { - reap_live() while keys(%LIVE) >= $jobs; return if $self->{lei}->{child_error}; my $task = bless { %$self }, __PACKAGE__;