X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiToMail.pm;h=31eba794066a9871a9e6b80567f1e3c296d16650;hb=HEAD;hp=76e103c7c2351638902bac42cd6290488e9e761d;hpb=a1733d3406dfbde52d1468e671edd1d76893f546;p=public-inbox.git diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 76e103c7..31eba794 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # Writes PublicInbox::Eml objects atomically to a mbox variant or Maildir @@ -12,6 +12,7 @@ use PublicInbox::Spawn qw(spawn); use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); +use PublicInbox::Syscall qw(rename_noreplace); my %kw2char = ( # Maildir characters draft => 'D', @@ -131,22 +132,26 @@ sub eml2mboxcl2 { } sub git_to_mail { # git->cat_async callback - my ($bref, $oid, $type, $size, $arg) = @_; - my ($write_cb, $smsg) = @$arg; - if ($type eq 'missing' && $smsg->{-lms_ro}) { - if ($bref = $smsg->{-lms_ro}->local_blob($oid, 1)) { + my ($bref, $oid, $type, $size, $smsg) = @_; + my $self = delete $smsg->{l2m} // die "BUG: no l2m"; + $type // return; # called by git->async_abort + eval { + if ($type eq 'missing' && + ($bref = $self->{-lms_rw}->local_blob($oid, 1))) { $type = 'blob'; $size = length($$bref); } - } - return warn("W: $oid is $type (!= blob)\n") if $type ne 'blob'; - return warn("E: $oid is empty\n") unless $size; - die "BUG: expected=$smsg->{blob} got=$oid" if $smsg->{blob} ne $oid; - $write_cb->($bref, $smsg); + $type eq 'blob' or return $self->{lei}->child_error(1, + "W: $oid is $type (!= blob)"); + $size or return $self->{lei}->child_error(1,"E: $oid is empty"); + $smsg->{blob} eq $oid or die "BUG: expected=$smsg->{blob}"; + $self->{wcb}->($bref, $smsg); + }; + $self->{lei}->fail("$@ (oid=$oid)") if $@; } -sub reap_compress { # dwaitpid callback - my ($lei, $pid) = @_; +sub reap_compress { # awaitpid callback + my ($pid, $lei) = @_; my $cmd = delete $lei->{"pid.$pid"}; return if $? == 0; $lei->fail("@$cmd failed", $? >> 8); @@ -195,6 +200,7 @@ sub _mbox_write_cb ($$) { sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($buf); + ++$lei->{-nr_seen}; return if $dedupe->is_dup($eml, $smsg); $lse->xsmsg_vmd($smsg) if $lse; $smsg->{-recent} = 1 if $set_recent; @@ -261,10 +267,8 @@ sub _buf2maildir ($$$$) { $rand = ''; do { $base = $rand.$common.':2,'.kw2suffix($kw); - } while (!($ok = link($tmp, $dst.$base)) && $!{EEXIST} && - ($rand = _rand.',')); - die "link($tmp, $dst$base): $!" unless $ok; - unlink($tmp) or warn "W: failed to unlink $tmp: $!\n"; + } while (!($ok = rename_noreplace($tmp, $dst.$base)) && + $!{EEXIST} && ($rand = _rand.',')); \$base; } else { my $err = "Error writing $smsg->{blob} to $dst: $!\n"; @@ -280,8 +284,9 @@ sub _maildir_write_cb ($$) { $dedupe->prepare_dedupe if $dedupe; my $dst = $lei->{ovv}->{dst}; my $lse = $lei->{lse}; # may be undef - my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef; - my $out = $sto ? 'maildir:'.$lei->abs_path($dst) : undef; + my $lms = $self->{-lms_rw}; + my $out = $lms ? 'maildir:'.$lei->abs_path($dst) : undef; + $lms->lms_write_prepare if $lms; # Favor cur/ and only write to new/ when augmenting. This # saves MUAs from having to do a mass rename when the initial @@ -290,13 +295,15 @@ sub _maildir_write_cb ($$) { sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $dst // return $lei->fail; # dst may be undef-ed in last run + + ++$lei->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml // PublicInbox::Eml->new($$bref), $smsg); $lse->xsmsg_vmd($smsg) if $lse; my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg, $dir); - $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; + $lms->set_src($smsg->oidbin, $out, $n) if $lms; ++$lei->{-nr_write}; } } @@ -306,15 +313,21 @@ sub _imap_write_cb ($$) { my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe if $dedupe; my $append = $lei->{net}->can('imap_append'); - my $uri = $self->{uri}; - my $mic = $lei->{net}->mic_get($uri); + my $uri = $self->{uri} // die 'BUG: no {uri}'; + my $mic = $lei->{net}->mic_get($uri) // die <mailbox; $uri->uidvalidity($mic->uidvalidity($folder)); my $lse = $lei->{lse}; # may be undef - my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef; + my $lms = $self->{-lms_rw}; + $lms->lms_write_prepare if $lms; sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $mic // return $lei->fail; # mic may be undef-ed in last run + + ++$lei->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml // PublicInbox::Eml->new($$bref), $smsg); @@ -325,9 +338,8 @@ sub _imap_write_cb ($$) { die $err; } # imap_append returns UID if IMAP server has UIDPLUS extension - ($sto && $uid =~ /\A[0-9]+\z/) and - $sto->wq_do('set_sync_info', - $smsg->{blob}, $$uri, $uid + 0); + ($lms && $uid =~ /\A[0-9]+\z/) and + $lms->set_src($smsg->oidbin, $$uri, $uid + 0); ++$lei->{-nr_write}; } } @@ -359,6 +371,7 @@ sub _v2_write_cb ($$) { sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($bref); + ++$lei->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml, $smsg); $lei->{v2w}->wq_do('add', $eml); # V2Writable->add ++$lei->{-nr_write}; @@ -414,11 +427,13 @@ sub new { require PublicInbox::LeiViewText; $lei->{lvt} = PublicInbox::LeiViewText->new($lei, $fmt); $self->{base_type} = 'text'; + $self->{-wq_nr_workers} = 1; # for pager @conflict = qw(mua save); } elsif ($fmt eq 'v2') { die "--dedupe=oid and v2 are incompatible\n" if ($lei->{opt}->{dedupe}//'') eq 'oid'; $self->{base_type} = 'v2'; + $self->{-wq_nr_workers} = 1; # v2 has shards $lei->{opt}->{save} = \1; $dst = $lei->{ovv}->{dst} = $lei->abs_path($dst); @conflict = qw(mua sort); @@ -637,9 +652,8 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } -sub v2w_done_wait { # dwaitpid callback - my ($arg, $pid) = @_; - my ($v2w, $lei) = @$arg; +sub v2w_done_wait { # awaitpid cb + my ($pid, $v2w, $lei) = @_; $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?; } @@ -665,8 +679,8 @@ sub _pre_augment_v2 { PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; my $v2w = $ibx->importer; - $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); - $v2w->wq_wait_async(\&v2w_done_wait, $lei); + $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}, + \&v2w_done_wait, $lei); $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; @@ -740,7 +754,8 @@ sub do_post_auth { $au_peers->[1] = undef; sysread($au_peers->[0], my $barrier1, 1); } - $self->{wcb} = $self->write_cb($lei); + eval { $self->{wcb} = $self->write_cb($lei) }; + $lei->fail($@) if $@; if ($au_peers) { # wait for peer l2m to set write_cb $au_peers->[3] = undef; sysread($au_peers->[2], my $barrier2, 1); @@ -751,11 +766,14 @@ sub ipc_atfork_child { my ($self) = @_; my $lei = $self->{lei}; $lei->_lei_atfork_child; - if (my $lse = $lei->{lse}) { - $self->{-lms_ro} = $lse->{-lms_ro} //= $lse->lms; - } $lei->{auth}->do_auth_atfork($self) if $lei->{auth}; $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); + $self->{git} = $self->{lei}->{ale}->git; + $SIG{TERM} = sub { # avoid ->DESTROY ordering problems + my $git = delete $self->{git}; + $git->async_wait_all if $git; + exit(15 + 128); + }; $self->SUPER::ipc_atfork_child; } @@ -774,21 +792,25 @@ sub poke_dst { sub write_mail { # via ->wq_io_do my ($self, $smsg, $eml) = @_; - return $self->{wcb}->(undef, $smsg, $eml) if $eml; - $smsg->{-lms_ro} = $self->{-lms_ro}; - $self->{lei}->{ale}->git->cat_async($smsg->{blob}, \&git_to_mail, - [$self->{wcb}, $smsg]); + if ($eml) { + eval { $self->{wcb}->(undef, $smsg, $eml) }; + $self->{lei}->fail("blob=$smsg->{blob} $@") if $@; + } else { + $smsg->{l2m} = $self; + $self->{git}->cat_async($smsg->{blob}, \&git_to_mail, $smsg); + } } sub wq_atexit_child { my ($self) = @_; local $PublicInbox::DS::in_loop = 0; # waitpid synchronously my $lei = $self->{lei}; - delete $self->{wcb}; $lei->{ale}->git->async_wait_all; - my $nr = delete($lei->{-nr_write}) or return; + my ($nr_w, $nr_s) = delete(@$lei{qw(-nr_write -nr_seen)}); + delete $self->{wcb}; + $nr_s or return; return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p}; - $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr); + $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr_w, $nr_s); } # runs on a 1s timer in lei-daemon