X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiToMail.pm;h=31eba794066a9871a9e6b80567f1e3c296d16650;hb=refs%2Fheads%2Fmaster;hp=83f58a29405b4b490d1037c1d8d311742546c943;hpb=54b0c90cff0c29040807b0073b71ad1e5467afbc;p=public-inbox.git diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 83f58a29..31eba794 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # Writes PublicInbox::Eml objects atomically to a mbox variant or Maildir @@ -132,23 +132,26 @@ sub eml2mboxcl2 { } sub git_to_mail { # git->cat_async callback - my ($bref, $oid, $type, $size, $arg) = @_; + my ($bref, $oid, $type, $size, $smsg) = @_; + my $self = delete $smsg->{l2m} // die "BUG: no l2m"; $type // return; # called by git->async_abort - my ($write_cb, $smsg) = @$arg; - if ($type eq 'missing' && $smsg->{-lms_rw}) { - if ($bref = $smsg->{-lms_rw}->local_blob($oid, 1)) { + eval { + if ($type eq 'missing' && + ($bref = $self->{-lms_rw}->local_blob($oid, 1))) { $type = 'blob'; $size = length($$bref); } - } - return warn("W: $oid is $type (!= blob)\n") if $type ne 'blob'; - return warn("E: $oid is empty\n") unless $size; - die "BUG: expected=$smsg->{blob} got=$oid" if $smsg->{blob} ne $oid; - $write_cb->($bref, $smsg); + $type eq 'blob' or return $self->{lei}->child_error(1, + "W: $oid is $type (!= blob)"); + $size or return $self->{lei}->child_error(1,"E: $oid is empty"); + $smsg->{blob} eq $oid or die "BUG: expected=$smsg->{blob}"; + $self->{wcb}->($bref, $smsg); + }; + $self->{lei}->fail("$@ (oid=$oid)") if $@; } -sub reap_compress { # dwaitpid callback - my ($lei, $pid) = @_; +sub reap_compress { # awaitpid callback + my ($pid, $lei) = @_; my $cmd = delete $lei->{"pid.$pid"}; return if $? == 0; $lei->fail("@$cmd failed", $? >> 8); @@ -197,6 +200,7 @@ sub _mbox_write_cb ($$) { sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($buf); + ++$lei->{-nr_seen}; return if $dedupe->is_dup($eml, $smsg); $lse->xsmsg_vmd($smsg) if $lse; $smsg->{-recent} = 1 if $set_recent; @@ -291,6 +295,8 @@ sub _maildir_write_cb ($$) { sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $dst // return $lei->fail; # dst may be undef-ed in last run + + ++$lei->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml // PublicInbox::Eml->new($$bref), $smsg); @@ -307,8 +313,11 @@ sub _imap_write_cb ($$) { my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe if $dedupe; my $append = $lei->{net}->can('imap_append'); - my $uri = $self->{uri}; - my $mic = $lei->{net}->mic_get($uri); + my $uri = $self->{uri} // die 'BUG: no {uri}'; + my $mic = $lei->{net}->mic_get($uri) // die <mailbox; $uri->uidvalidity($mic->uidvalidity($folder)); my $lse = $lei->{lse}; # may be undef @@ -317,6 +326,8 @@ sub _imap_write_cb ($$) { sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $mic // return $lei->fail; # mic may be undef-ed in last run + + ++$lei->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml // PublicInbox::Eml->new($$bref), $smsg); @@ -360,6 +371,7 @@ sub _v2_write_cb ($$) { sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($bref); + ++$lei->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml, $smsg); $lei->{v2w}->wq_do('add', $eml); # V2Writable->add ++$lei->{-nr_write}; @@ -415,11 +427,13 @@ sub new { require PublicInbox::LeiViewText; $lei->{lvt} = PublicInbox::LeiViewText->new($lei, $fmt); $self->{base_type} = 'text'; + $self->{-wq_nr_workers} = 1; # for pager @conflict = qw(mua save); } elsif ($fmt eq 'v2') { die "--dedupe=oid and v2 are incompatible\n" if ($lei->{opt}->{dedupe}//'') eq 'oid'; $self->{base_type} = 'v2'; + $self->{-wq_nr_workers} = 1; # v2 has shards $lei->{opt}->{save} = \1; $dst = $lei->{ovv}->{dst} = $lei->abs_path($dst); @conflict = qw(mua sort); @@ -638,9 +652,8 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } -sub v2w_done_wait { # dwaitpid callback - my ($arg, $pid) = @_; - my ($v2w, $lei) = @$arg; +sub v2w_done_wait { # awaitpid cb + my ($pid, $v2w, $lei) = @_; $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?; } @@ -666,8 +679,8 @@ sub _pre_augment_v2 { PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; my $v2w = $ibx->importer; - $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); - $v2w->wq_wait_async(\&v2w_done_wait, $lei); + $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}, + \&v2w_done_wait, $lei); $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; @@ -741,7 +754,8 @@ sub do_post_auth { $au_peers->[1] = undef; sysread($au_peers->[0], my $barrier1, 1); } - $self->{wcb} = $self->write_cb($lei); + eval { $self->{wcb} = $self->write_cb($lei) }; + $lei->fail($@) if $@; if ($au_peers) { # wait for peer l2m to set write_cb $au_peers->[3] = undef; sysread($au_peers->[2], my $barrier2, 1); @@ -754,6 +768,12 @@ sub ipc_atfork_child { $lei->_lei_atfork_child; $lei->{auth}->do_auth_atfork($self) if $lei->{auth}; $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); + $self->{git} = $self->{lei}->{ale}->git; + $SIG{TERM} = sub { # avoid ->DESTROY ordering problems + my $git = delete $self->{git}; + $git->async_wait_all if $git; + exit(15 + 128); + }; $self->SUPER::ipc_atfork_child; } @@ -772,21 +792,25 @@ sub poke_dst { sub write_mail { # via ->wq_io_do my ($self, $smsg, $eml) = @_; - return $self->{wcb}->(undef, $smsg, $eml) if $eml; - $smsg->{-lms_rw} = $self->{-lms_rw}; - $self->{lei}->{ale}->git->cat_async($smsg->{blob}, \&git_to_mail, - [$self->{wcb}, $smsg]); + if ($eml) { + eval { $self->{wcb}->(undef, $smsg, $eml) }; + $self->{lei}->fail("blob=$smsg->{blob} $@") if $@; + } else { + $smsg->{l2m} = $self; + $self->{git}->cat_async($smsg->{blob}, \&git_to_mail, $smsg); + } } sub wq_atexit_child { my ($self) = @_; local $PublicInbox::DS::in_loop = 0; # waitpid synchronously my $lei = $self->{lei}; - delete $self->{wcb}; $lei->{ale}->git->async_wait_all; - my $nr = delete($lei->{-nr_write}) or return; + my ($nr_w, $nr_s) = delete(@$lei{qw(-nr_write -nr_seen)}); + delete $self->{wcb}; + $nr_s or return; return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p}; - $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr); + $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr_w, $nr_s); } # runs on a 1s timer in lei-daemon