X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiToMail.pm;h=31eba794066a9871a9e6b80567f1e3c296d16650;hb=refs%2Fheads%2Fmaster;hp=3c5e7e59e8eedf3b2108a70a076e0ea83d18509c;hpb=258365e6cd56f9039b4de7723e4950563a163c8b;p=public-inbox.git diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 3c5e7e59..31eba794 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # Writes PublicInbox::Eml objects atomically to a mbox variant or Maildir @@ -132,23 +132,26 @@ sub eml2mboxcl2 { } sub git_to_mail { # git->cat_async callback - my ($bref, $oid, $type, $size, $arg) = @_; + my ($bref, $oid, $type, $size, $smsg) = @_; + my $self = delete $smsg->{l2m} // die "BUG: no l2m"; $type // return; # called by git->async_abort - my ($write_cb, $smsg) = @$arg; - if ($type eq 'missing' && $smsg->{-lms_rw}) { - if ($bref = $smsg->{-lms_rw}->local_blob($oid, 1)) { + eval { + if ($type eq 'missing' && + ($bref = $self->{-lms_rw}->local_blob($oid, 1))) { $type = 'blob'; $size = length($$bref); } - } - return warn("W: $oid is $type (!= blob)\n") if $type ne 'blob'; - return warn("E: $oid is empty\n") unless $size; - die "BUG: expected=$smsg->{blob} got=$oid" if $smsg->{blob} ne $oid; - $write_cb->($bref, $smsg); + $type eq 'blob' or return $self->{lei}->child_error(1, + "W: $oid is $type (!= blob)"); + $size or return $self->{lei}->child_error(1,"E: $oid is empty"); + $smsg->{blob} eq $oid or die "BUG: expected=$smsg->{blob}"; + $self->{wcb}->($bref, $smsg); + }; + $self->{lei}->fail("$@ (oid=$oid)") if $@; } -sub reap_compress { # dwaitpid callback - my ($lei, $pid) = @_; +sub reap_compress { # awaitpid callback + my ($pid, $lei) = @_; my $cmd = delete $lei->{"pid.$pid"}; return if $? == 0; $lei->fail("@$cmd failed", $? >> 8); @@ -197,6 +200,7 @@ sub _mbox_write_cb ($$) { sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($buf); + ++$lei->{-nr_seen}; return if $dedupe->is_dup($eml, $smsg); $lse->xsmsg_vmd($smsg) if $lse; $smsg->{-recent} = 1 if $set_recent; @@ -291,6 +295,8 @@ sub _maildir_write_cb ($$) { sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $dst // return $lei->fail; # dst may be undef-ed in last run + + ++$lei->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml // PublicInbox::Eml->new($$bref), $smsg); @@ -307,8 +313,11 @@ sub _imap_write_cb ($$) { my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe if $dedupe; my $append = $lei->{net}->can('imap_append'); - my $uri = $self->{uri}; - my $mic = $lei->{net}->mic_get($uri); + my $uri = $self->{uri} // die 'BUG: no {uri}'; + my $mic = $lei->{net}->mic_get($uri) // die <mailbox; $uri->uidvalidity($mic->uidvalidity($folder)); my $lse = $lei->{lse}; # may be undef @@ -317,6 +326,8 @@ sub _imap_write_cb ($$) { sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $mic // return $lei->fail; # mic may be undef-ed in last run + + ++$lei->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml // PublicInbox::Eml->new($$bref), $smsg); @@ -360,6 +371,7 @@ sub _v2_write_cb ($$) { sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($bref); + ++$lei->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml, $smsg); $lei->{v2w}->wq_do('add', $eml); # V2Writable->add ++$lei->{-nr_write}; @@ -640,9 +652,8 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } -sub v2w_done_wait { # dwaitpid callback - my ($arg, $pid) = @_; - my ($v2w, $lei) = @$arg; +sub v2w_done_wait { # awaitpid cb + my ($pid, $v2w, $lei) = @_; $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?; } @@ -668,8 +679,8 @@ sub _pre_augment_v2 { PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; my $v2w = $ibx->importer; - $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); - $v2w->wq_wait_async(\&v2w_done_wait, $lei); + $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}, + \&v2w_done_wait, $lei); $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; @@ -743,7 +754,8 @@ sub do_post_auth { $au_peers->[1] = undef; sysread($au_peers->[0], my $barrier1, 1); } - $self->{wcb} = $self->write_cb($lei); + eval { $self->{wcb} = $self->write_cb($lei) }; + $lei->fail($@) if $@; if ($au_peers) { # wait for peer l2m to set write_cb $au_peers->[3] = undef; sysread($au_peers->[2], my $barrier2, 1); @@ -780,21 +792,25 @@ sub poke_dst { sub write_mail { # via ->wq_io_do my ($self, $smsg, $eml) = @_; - return $self->{wcb}->(undef, $smsg, $eml) if $eml; - $smsg->{-lms_rw} = $self->{-lms_rw}; - $self->{git}->cat_async($smsg->{blob}, \&git_to_mail, - [$self->{wcb}, $smsg]); + if ($eml) { + eval { $self->{wcb}->(undef, $smsg, $eml) }; + $self->{lei}->fail("blob=$smsg->{blob} $@") if $@; + } else { + $smsg->{l2m} = $self; + $self->{git}->cat_async($smsg->{blob}, \&git_to_mail, $smsg); + } } sub wq_atexit_child { my ($self) = @_; local $PublicInbox::DS::in_loop = 0; # waitpid synchronously my $lei = $self->{lei}; - delete $self->{wcb}; $lei->{ale}->git->async_wait_all; - my $nr = delete($lei->{-nr_write}) or return; + my ($nr_w, $nr_s) = delete(@$lei{qw(-nr_write -nr_seen)}); + delete $self->{wcb}; + $nr_s or return; return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p}; - $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr); + $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr_w, $nr_s); } # runs on a 1s timer in lei-daemon