my $zsfx = $self->{zsfx} or return;
my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei);
my ($r, $w) = @{delete $lei->{zpipe}};
- my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
+ my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 };
my $pid = spawn($cmd, undef, $rdr);
my $pp = gensym;
my $dup = bless { "pid.$pid" => $cmd }, ref($lei);
# fast (spawn compressor or mkdir), runs in same process as pre_augment
sub post_augment {
my ($self, $lei, @args) = @_;
+ $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
+
my $wait = $lei->{opt}->{'import-before'} ?
$lei->{sto}->wq_do('checkpoint', 1) : 0;
# _post_augment_mbox
} else { # Maildir
$self->{shard_info} = [ $mod, $shard ];
}
- $aug = '+'; # incr_post_augment
+ $aug = 'incr_post_augment';
} elsif ($self->{-wq_worker_nr} == 0) { # 1st worker do_augment
- $aug = '.'; # do_post_augment
+ $aug = 'do_post_augment';
}
if ($aug) {
local $0 = 'do_augment';
eval { do_augment($self, $lei) };
$lei->fail($@) if $@;
- $lei->{pkt_op_p}->pkt_do($aug) == 1 or
- die "do_post_augment trigger: $!";
+ $lei->{pkt_op_p}->pkt_do($aug) or die "pkt_do($aug): $!";
}
# done augmenting, connect the compressor pipe for each worker
if (my $zpipe = delete $lei->{zpipe}) {
$lei->{pkt_op_p}->pkt_do('l2m_progress', $nr);
}
+# runs on a 1s timer in lei-daemon
+sub augment_inprogress {
+ my ($err, $opt, $dst, $au_noted) = @_;
+ eval {
+ return if $$au_noted++ || !$err || !defined(fileno($err));
+ print $err '# '.($opt->{'import-before'} ?
+ "importing non-external contents of $dst" : (
+ ($opt->{dedupe} // 'content') ne 'none') ?
+ "scanning old contents of $dst for dedupe" :
+ "removing old contents of $dst")." ...\n";
+ };
+ warn "E: $@ ($dst)" if $@;
+}
+
# called in top-level lei-daemon when LeiAuth is done
sub net_merge_all_done {
- my ($self) = @_;
+ my ($self, $lei) = @_;
+ if ($PublicInbox::DS::in_loop &&
+ $self->can("_do_augment_$self->{base_type}") &&
+ !$lei->{opt}->{quiet}) {
+ $self->{-au_noted} = 0;
+ PublicInbox::DS::add_timer(1, \&augment_inprogress,
+ $lei->{2}, $lei->{opt},
+ $self->{dst}, \$self->{-au_noted});
+ }
$self->wq_broadcast('do_post_auth');
$self->wq_close(1);
}