X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiToMail.pm;h=d33d27aec00646879c89f702b4c3610775f47b09;hb=4cd7a78f3b8c03670e2d77675229472506eee1eb;hp=ed609081b0a4c5dde199a0b5f47ce39989c1a652;hpb=702864ea3393f4c7a91336b277083bf9fd4b0860;p=public-inbox.git diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index ed609081..d33d27ae 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -12,6 +12,7 @@ use PublicInbox::Spawn qw(spawn); use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); +use PublicInbox::Syscall qw(rename_noreplace); my %kw2char = ( # Maildir characters draft => 'D', @@ -132,6 +133,7 @@ sub eml2mboxcl2 { sub git_to_mail { # git->cat_async callback my ($bref, $oid, $type, $size, $arg) = @_; + $type // return; # called by git->async_abort my ($write_cb, $smsg) = @$arg; if ($type eq 'missing' && $smsg->{-lms_ro}) { if ($bref = $smsg->{-lms_ro}->local_blob($oid, 1)) { @@ -157,7 +159,7 @@ sub _post_augment_mbox { # open a compressor process from top-level process my $zsfx = $self->{zsfx} or return; my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei); my ($r, $w) = @{delete $lei->{zpipe}}; - my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} }; + my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 }; my $pid = spawn($cmd, undef, $rdr); my $pp = gensym; my $dup = bless { "pid.$pid" => $cmd }, ref($lei); @@ -261,10 +263,8 @@ sub _buf2maildir ($$$$) { $rand = ''; do { $base = $rand.$common.':2,'.kw2suffix($kw); - } while (!($ok = link($tmp, $dst.$base)) && $!{EEXIST} && - ($rand = _rand.',')); - die "link($tmp, $dst$base): $!" unless $ok; - unlink($tmp) or warn "W: failed to unlink $tmp: $!\n"; + } while (!($ok = rename_noreplace($tmp, $dst.$base)) && + $!{EEXIST} && ($rand = _rand.',')); \$base; } else { my $err = "Error writing $smsg->{blob} to $dst: $!\n"; @@ -637,6 +637,12 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } +sub v2w_done_wait { # dwaitpid callback + my ($arg, $pid) = @_; + my ($v2w, $lei) = @$arg; + $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?; +} + sub _pre_augment_v2 { my ($self, $lei) = @_; my $dir = $self->{dst}; @@ -659,8 +665,8 @@ sub _pre_augment_v2 { PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; my $v2w = $ibx->importer; - $v2w->{-wq_no_bcast} = 1; $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); + $v2w->wq_wait_async(\&v2w_done_wait, $lei); $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; @@ -714,16 +720,15 @@ sub do_post_auth { } else { # Maildir $self->{shard_info} = [ $mod, $shard ]; } - $aug = '+'; # incr_post_augment + $aug = 'incr_post_augment'; } elsif ($self->{-wq_worker_nr} == 0) { # 1st worker do_augment - $aug = '.'; # do_post_augment + $aug = 'do_post_augment'; } if ($aug) { local $0 = 'do_augment'; eval { do_augment($self, $lei) }; $lei->fail($@) if $@; - $lei->{pkt_op_p}->pkt_do($aug) == 1 or - die "do_post_augment trigger: $!"; + $lei->{pkt_op_p}->pkt_do($aug) or die "pkt_do($aug): $!"; } # done augmenting, connect the compressor pipe for each worker if (my $zpipe = delete $lei->{zpipe}) { @@ -789,12 +794,15 @@ sub wq_atexit_child { # runs on a 1s timer in lei-daemon sub augment_inprogress { my ($err, $opt, $dst, $au_noted) = @_; - $$au_noted++ and return; - print $err '# '.($opt->{'import-before'} ? - "importing non-external contents of $dst" : ( - ($opt->{dedupe} // 'content') ne 'none') ? - "scanning old contents of $dst for dedupe" : - "removing old contents of $dst")." ...\n"; + eval { + return if $$au_noted++ || !$err || !defined(fileno($err)); + print $err '# '.($opt->{'import-before'} ? + "importing non-external contents of $dst" : ( + ($opt->{dedupe} // 'content') ne 'none') ? + "scanning old contents of $dst for dedupe" : + "removing old contents of $dst")." ...\n"; + }; + warn "E: $@ ($dst)" if $@; } # called in top-level lei-daemon when LeiAuth is done @@ -809,7 +817,7 @@ sub net_merge_all_done { $self->{dst}, \$self->{-au_noted}); } $self->wq_broadcast('do_post_auth'); - $self->wq_close(1); + $self->wq_close; } 1;