From fe0bf750fa0ef00cad3e16cbffab1b359322a510 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 28 Nov 2022 05:31:13 +0000 Subject: [PATCH] lei_mirror: consolidate clone process management This simplifies our code by having fewer places check process limits and perform reaping. We'll also print command names immediately before executing, instead of right before waiting for running processes. --- lib/PublicInbox/LeiMirror.pm | 37 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index eaffb8fa..8796951d 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -118,9 +118,9 @@ sub _get_txt_start { # non-fatal my $cmd = $self->{curl}->for_uri($lei, $uri, qw(--compressed -R -o), $ft->filename); $self->{"-get_txt.$endpoint"} = [ $ft, $cmd, $uri ]; - $lei->qerr("# @$cmd"); my $jobs = $lei->{opt}->{jobs} // 1; reap_live() while keys(%LIVE) >= $jobs; + $lei->qerr("# @$cmd"); $LIVE{spawn($cmd, undef, $opt)} = [ \&_get_txt_done, $self, $endpoint, $fini ]; } @@ -223,6 +223,14 @@ sub run_reap { $ret; } +sub start_clone { + my ($self, $cmd, $opt, $fini) = @_; + my $jobs = $self->{lei}->{opt}->{jobs} // 1; + reap_live() while keys(%LIVE) >= $jobs; + $self->{lei}->qerr("# @$cmd"); + $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone, $self, $cmd, $fini ]; +} + sub clone_v1 { my ($self, $nohang) = @_; my $lei = $self->{lei}; @@ -233,11 +241,8 @@ sub clone_v1 { my $pfx = $curl->torsocks($lei, $uri) or return; my $dst = $self->{cur_dst} // $self->{dst}; my $fini = PublicInbox::OnDestroy->new($$, \&v1_done, $self); - my $jobs = $self->{lei}->{opt}->{jobs} // 1; my $cmd = [ @$pfx, clone_cmd($lei, my $opt = {}), "$uri", $dst ]; - $lei->qerr("# @$cmd"); - $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone, $lei, $cmd, $fini ]; - reap_live() while keys(%LIVE) >= $jobs; + start_clone($self, $cmd, $opt, $fini); _get_txt_start($self, '_/text/config/raw', $fini); my $d = $self->{-ent} ? $self->{-ent}->{description} : undef; @@ -309,12 +314,12 @@ EOM } sub reap_clone { # async, called via SIGCHLD - my ($lei, $cmd) = @_; + my ($self, $cmd) = @_; my $cerr = $?; $? = 0; # don't let it influence normal exit if ($cerr) { kill('TERM', keys %LIVE); - $lei->child_error($cerr, "@$cmd failed"); + $self->{lei}->child_error($cerr, "@$cmd failed"); } } @@ -395,17 +400,10 @@ failed to extract epoch number from $src $task->{-locked} = $lk->lock_for_scope($$); my @cmd = clone_cmd($lei, my $opt = {}); - my $jobs = $self->{lei}->{opt}->{jobs} // 1; - do { - reap_live() while keys(%LIVE) >= $jobs; - while (keys(%LIVE) < $jobs && @src_edst && - !$lei->{child_error}) { - my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ]; - $lei->qerr("# @$cmd"); - $LIVE{spawn($cmd, undef, $opt)} = [ \&reap_clone, - $lei, $cmd, $fini ]; - } - } while (@src_edst && !$lei->{child_error}); + while (@src_edst && !$lei->{child_error}) { + my $cmd = [ @$pfx, @cmd, splice(@src_edst, 0, 2) ]; + start_clone($self, $cmd, $opt, $fini); + } } sub decode_manifest ($$$) { @@ -508,7 +506,6 @@ sub try_manifest { } my ($path_pfx, $n, $multi) = multi_inbox($self, \$path, $m); return $lei->child_error(1, $multi) if !ref($multi); - my $jobs = $self->{lei}->{opt}->{jobs} // 1; if (my $v2 = delete $multi->{v2}) { for my $name (sort keys %$v2) { my $epochs = delete $v2->{$name}; @@ -531,7 +528,6 @@ sub try_manifest { E: `$self->{cur_dst}' must not contain newline EOM clone_v2($self, \%v2_epochs, $m); - reap_live() while keys(%LIVE) >= $jobs; return if $self->{lei}->{child_error}; } } @@ -540,7 +536,6 @@ EOM chop($p) if substr($p, -1, 1) eq '/'; $uri->path($p); for my $name (@$v1) { - reap_live() while keys(%LIVE) >= $jobs; return if $self->{lei}->{child_error}; my $task = bless { %$self }, __PACKAGE__; -- 2.44.0