use Symbol qw(gensym);
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
+use PublicInbox::Syscall qw(rename_noreplace);
my %kw2char = ( # Maildir characters
draft => 'D',
sub git_to_mail { # git->cat_async callback
my ($bref, $oid, $type, $size, $arg) = @_;
+ $type // return; # called by git->async_abort
my ($write_cb, $smsg) = @$arg;
if ($type eq 'missing' && $smsg->{-lms_ro}) {
if ($bref = $smsg->{-lms_ro}->local_blob($oid, 1)) {
my $zsfx = $self->{zsfx} or return;
my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei);
my ($r, $w) = @{delete $lei->{zpipe}};
- my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
+ my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 };
my $pid = spawn($cmd, undef, $rdr);
my $pp = gensym;
my $dup = bless { "pid.$pid" => $cmd }, ref($lei);
$rand = '';
do {
$base = $rand.$common.':2,'.kw2suffix($kw);
- } while (!($ok = link($tmp, $dst.$base)) && $!{EEXIST} &&
- ($rand = _rand.','));
- die "link($tmp, $dst$base): $!" unless $ok;
- unlink($tmp) or warn "W: failed to unlink $tmp: $!\n";
+ } while (!($ok = rename_noreplace($tmp, $dst.$base)) &&
+ $!{EEXIST} && ($rand = _rand.','));
\$base;
} else {
my $err = "Error writing $smsg->{blob} to $dst: $!\n";
$dedupe->pause_dedupe if $dedupe;
}
+sub v2w_done_wait { # dwaitpid callback
+ my ($arg, $pid) = @_;
+ my ($v2w, $lei) = @$arg;
+ $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
+}
+
sub _pre_augment_v2 {
my ($self, $lei) = @_;
my $dir = $self->{dst};
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
my $v2w = $ibx->importer;
- $v2w->{-wq_no_bcast} = 1;
$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+ $v2w->wq_wait_async(\&v2w_done_wait, $lei);
$lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
} else { # Maildir
$self->{shard_info} = [ $mod, $shard ];
}
- $aug = '+'; # incr_post_augment
+ $aug = 'incr_post_augment';
} elsif ($self->{-wq_worker_nr} == 0) { # 1st worker do_augment
- $aug = '.'; # do_post_augment
+ $aug = 'do_post_augment';
}
if ($aug) {
local $0 = 'do_augment';
eval { do_augment($self, $lei) };
$lei->fail($@) if $@;
- $lei->{pkt_op_p}->pkt_do($aug) == 1 or
- die "do_post_augment trigger: $!";
+ $lei->{pkt_op_p}->pkt_do($aug) or die "pkt_do($aug): $!";
}
# done augmenting, connect the compressor pipe for each worker
if (my $zpipe = delete $lei->{zpipe}) {
"scanning old contents of $dst for dedupe" :
"removing old contents of $dst")." ...\n";
};
- warn "E: $@" if $@;
+ warn "E: $@ ($dst)" if $@;
}
# called in top-level lei-daemon when LeiAuth is done
$self->{dst}, \$self->{-au_noted});
}
$self->wq_broadcast('do_post_auth');
- $self->wq_close(1);
+ $self->wq_close;
}
1;