]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiToMail.pm
imap+nntp: share COMPRESS implementation
[public-inbox.git] / lib / PublicInbox / LeiToMail.pm
index e7220bf02954f3c7c5382e42f7d0f35e6e7df266..2aa3977e8034d01f744c05a3a7c5598c9e97890e 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Writes PublicInbox::Eml objects atomically to a mbox variant or Maildir
@@ -197,6 +197,7 @@ sub _mbox_write_cb ($$) {
        sub { # for git_to_mail
                my ($buf, $smsg, $eml) = @_;
                $eml //= PublicInbox::Eml->new($buf);
+               ++$lei->{-nr_seen};
                return if $dedupe->is_dup($eml, $smsg);
                $lse->xsmsg_vmd($smsg) if $lse;
                $smsg->{-recent} = 1 if $set_recent;
@@ -282,6 +283,7 @@ sub _maildir_write_cb ($$) {
        my $lse = $lei->{lse}; # may be undef
        my $lms = $self->{-lms_rw};
        my $out = $lms ? 'maildir:'.$lei->abs_path($dst) : undef;
+       $lms->lms_write_prepare if $lms;
 
        # Favor cur/ and only write to new/ when augmenting.  This
        # saves MUAs from having to do a mass rename when the initial
@@ -290,6 +292,8 @@ sub _maildir_write_cb ($$) {
        sub { # for git_to_mail
                my ($bref, $smsg, $eml) = @_;
                $dst // return $lei->fail; # dst may be undef-ed in last run
+
+               ++$lei->{-nr_seen};
                return if $dedupe && $dedupe->is_dup($eml //
                                                PublicInbox::Eml->new($$bref),
                                                $smsg);
@@ -312,9 +316,12 @@ sub _imap_write_cb ($$) {
        $uri->uidvalidity($mic->uidvalidity($folder));
        my $lse = $lei->{lse}; # may be undef
        my $lms = $self->{-lms_rw};
+       $lms->lms_write_prepare if $lms;
        sub { # for git_to_mail
                my ($bref, $smsg, $eml) = @_;
                $mic // return $lei->fail; # mic may be undef-ed in last run
+
+               ++$lei->{-nr_seen};
                return if $dedupe && $dedupe->is_dup($eml //
                                                PublicInbox::Eml->new($$bref),
                                                $smsg);
@@ -358,6 +365,7 @@ sub _v2_write_cb ($$) {
        sub { # for git_to_mail
                my ($bref, $smsg, $eml) = @_;
                $eml //= PublicInbox::Eml->new($bref);
+               ++$lei->{-nr_seen};
                return if $dedupe && $dedupe->is_dup($eml, $smsg);
                $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
                ++$lei->{-nr_write};
@@ -413,11 +421,13 @@ sub new {
                require PublicInbox::LeiViewText;
                $lei->{lvt} = PublicInbox::LeiViewText->new($lei, $fmt);
                $self->{base_type} = 'text';
+               $self->{-wq_nr_workers} = 1; # for pager
                @conflict = qw(mua save);
        } elsif ($fmt eq 'v2') {
                die "--dedupe=oid and v2 are incompatible\n" if
                        ($lei->{opt}->{dedupe}//'') eq 'oid';
                $self->{base_type} = 'v2';
+               $self->{-wq_nr_workers} = 1; # v2 has shards
                $lei->{opt}->{save} = \1;
                $dst = $lei->{ovv}->{dst} = $lei->abs_path($dst);
                @conflict = qw(mua sort);
@@ -750,9 +760,14 @@ sub ipc_atfork_child {
        my ($self) = @_;
        my $lei = $self->{lei};
        $lei->_lei_atfork_child;
-       $self->{-lms_rw}->lms_write_prepare if $self->{-lms_rw};
        $lei->{auth}->do_auth_atfork($self) if $lei->{auth};
        $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
+       $self->{git} = $self->{lei}->{ale}->git;
+       $SIG{TERM} = sub { # avoid ->DESTROY ordering problems
+               my $git = delete $self->{git};
+               $git->async_wait_all if $git;
+               exit(15 + 128);
+       };
        $self->SUPER::ipc_atfork_child;
 }
 
@@ -773,7 +788,7 @@ sub write_mail { # via ->wq_io_do
        my ($self, $smsg, $eml) = @_;
        return $self->{wcb}->(undef, $smsg, $eml) if $eml;
        $smsg->{-lms_rw} = $self->{-lms_rw};
-       $self->{lei}->{ale}->git->cat_async($smsg->{blob}, \&git_to_mail,
+       $self->{git}->cat_async($smsg->{blob}, \&git_to_mail,
                                [$self->{wcb}, $smsg]);
 }
 
@@ -783,9 +798,10 @@ sub wq_atexit_child {
        my $lei = $self->{lei};
        delete $self->{wcb};
        $lei->{ale}->git->async_wait_all;
-       my $nr = delete($lei->{-nr_write}) or return;
+       my ($nr_w, $nr_s) = delete(@$lei{qw(-nr_write -nr_seen)});
+       $nr_s or return;
        return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p};
-       $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr);
+       $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr_w, $nr_s);
 }
 
 # runs on a 1s timer in lei-daemon