]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiToMail.pm
lei: TSTP affects all curl and related subprocesses
[public-inbox.git] / lib / PublicInbox / LeiToMail.pm
index a419b83f211cc8757f4988e686dab30dd9266191..9c748deaed161db7b9e2b02674a521fcc6ce5e0a 100644 (file)
@@ -157,7 +157,7 @@ sub _post_augment_mbox { # open a compressor process from top-level process
        my $zsfx = $self->{zsfx} or return;
        my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei);
        my ($r, $w) = @{delete $lei->{zpipe}};
-       my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
+       my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 };
        my $pid = spawn($cmd, undef, $rdr);
        my $pp = gensym;
        my $dup = bless { "pid.$pid" => $cmd }, ref($lei);
@@ -689,6 +689,8 @@ sub do_augment { # slow, runs in wq worker
 # fast (spawn compressor or mkdir), runs in same process as pre_augment
 sub post_augment {
        my ($self, $lei, @args) = @_;
+       $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
+
        my $wait = $lei->{opt}->{'import-before'} ?
                        $lei->{sto}->wq_do('checkpoint', 1) : 0;
        # _post_augment_mbox
@@ -712,16 +714,15 @@ sub do_post_auth {
                } else { # Maildir
                        $self->{shard_info} = [ $mod, $shard ];
                }
-               $aug = '+'; # incr_post_augment
+               $aug = 'incr_post_augment';
        } elsif ($self->{-wq_worker_nr} == 0) { # 1st worker do_augment
-               $aug = '.'; # do_post_augment
+               $aug = 'do_post_augment';
        }
        if ($aug) {
                local $0 = 'do_augment';
                eval { do_augment($self, $lei) };
                $lei->fail($@) if $@;
-               $lei->{pkt_op_p}->pkt_do($aug) == 1 or
-                               die "do_post_augment trigger: $!";
+               $lei->{pkt_op_p}->pkt_do($aug) or die "pkt_do($aug): $!";
        }
        # done augmenting, connect the compressor pipe for each worker
        if (my $zpipe = delete $lei->{zpipe}) {
@@ -784,9 +785,31 @@ sub wq_atexit_child {
        $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr);
 }
 
+# runs on a 1s timer in lei-daemon
+sub augment_inprogress {
+       my ($err, $opt, $dst, $au_noted) = @_;
+       eval {
+               return if $$au_noted++ || !$err || !defined(fileno($err));
+               print $err '# '.($opt->{'import-before'} ?
+                               "importing non-external contents of $dst" : (
+                               ($opt->{dedupe} // 'content') ne 'none') ?
+                               "scanning old contents of $dst for dedupe" :
+                               "removing old contents of $dst")." ...\n";
+       };
+       warn "E: $@ ($dst)" if $@;
+}
+
 # called in top-level lei-daemon when LeiAuth is done
 sub net_merge_all_done {
-       my ($self) = @_;
+       my ($self, $lei) = @_;
+       if ($PublicInbox::DS::in_loop &&
+                       $self->can("_do_augment_$self->{base_type}") &&
+                       !$lei->{opt}->{quiet}) {
+               $self->{-au_noted} = 0;
+               PublicInbox::DS::add_timer(1, \&augment_inprogress,
+                               $lei->{2}, $lei->{opt},
+                               $self->{dst}, \$self->{-au_noted});
+       }
        $self->wq_broadcast('do_post_auth');
        $self->wq_close(1);
 }