X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiToMail.pm;h=d33d27aec00646879c89f702b4c3610775f47b09;hb=4cd7a78f3b8c03670e2d77675229472506eee1eb;hp=d42759cf71d4d372612fdbc4c79efcf270533e1d;hpb=d0100ddd9fe00197cc846d83e70b7611c57dbf43;p=public-inbox.git diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index d42759cf..d33d27ae 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -12,6 +12,7 @@ use PublicInbox::Spawn qw(spawn); use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); +use PublicInbox::Syscall qw(rename_noreplace); my %kw2char = ( # Maildir characters draft => 'D', @@ -132,6 +133,7 @@ sub eml2mboxcl2 { sub git_to_mail { # git->cat_async callback my ($bref, $oid, $type, $size, $arg) = @_; + $type // return; # called by git->async_abort my ($write_cb, $smsg) = @$arg; if ($type eq 'missing' && $smsg->{-lms_ro}) { if ($bref = $smsg->{-lms_ro}->local_blob($oid, 1)) { @@ -157,7 +159,7 @@ sub _post_augment_mbox { # open a compressor process from top-level process my $zsfx = $self->{zsfx} or return; my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei); my ($r, $w) = @{delete $lei->{zpipe}}; - my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} }; + my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 }; my $pid = spawn($cmd, undef, $rdr); my $pp = gensym; my $dup = bless { "pid.$pid" => $cmd }, ref($lei); @@ -261,10 +263,8 @@ sub _buf2maildir ($$$$) { $rand = ''; do { $base = $rand.$common.':2,'.kw2suffix($kw); - } while (!($ok = link($tmp, $dst.$base)) && $!{EEXIST} && - ($rand = _rand.',')); - die "link($tmp, $dst$base): $!" unless $ok; - unlink($tmp) or warn "W: failed to unlink $tmp: $!\n"; + } while (!($ok = rename_noreplace($tmp, $dst.$base)) && + $!{EEXIST} && ($rand = _rand.',')); \$base; } else { my $err = "Error writing $smsg->{blob} to $dst: $!\n"; @@ -637,6 +637,12 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } +sub v2w_done_wait { # dwaitpid callback + my ($arg, $pid) = @_; + my ($v2w, $lei) = @$arg; + $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?; +} + sub _pre_augment_v2 { my ($self, $lei) = @_; my $dir = $self->{dst}; @@ -659,8 +665,8 @@ sub _pre_augment_v2 { PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; my $v2w = $ibx->importer; - $v2w->{-wq_no_bcast} = 1; $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); + $v2w->wq_wait_async(\&v2w_done_wait, $lei); $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; @@ -796,7 +802,7 @@ sub augment_inprogress { "scanning old contents of $dst for dedupe" : "removing old contents of $dst")." ...\n"; }; - warn "E: $@" if $@; + warn "E: $@ ($dst)" if $@; } # called in top-level lei-daemon when LeiAuth is done @@ -811,7 +817,7 @@ sub net_merge_all_done { $self->{dst}, \$self->{-au_noted}); } $self->wq_broadcast('do_post_auth'); - $self->wq_close(1); + $self->wq_close; } 1;