]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiToMail.pm
dsdeflate: shorten scope of initial buffer
[public-inbox.git] / lib / PublicInbox / LeiToMail.pm
index d33d27aec00646879c89f702b4c3610775f47b09..2aa3977e8034d01f744c05a3a7c5598c9e97890e 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Writes PublicInbox::Eml objects atomically to a mbox variant or Maildir
@@ -135,8 +135,8 @@ sub git_to_mail { # git->cat_async callback
        my ($bref, $oid, $type, $size, $arg) = @_;
        $type // return; # called by git->async_abort
        my ($write_cb, $smsg) = @$arg;
-       if ($type eq 'missing' && $smsg->{-lms_ro}) {
-               if ($bref = $smsg->{-lms_ro}->local_blob($oid, 1)) {
+       if ($type eq 'missing' && $smsg->{-lms_rw}) {
+               if ($bref = $smsg->{-lms_rw}->local_blob($oid, 1)) {
                        $type = 'blob';
                        $size = length($$bref);
                }
@@ -197,6 +197,7 @@ sub _mbox_write_cb ($$) {
        sub { # for git_to_mail
                my ($buf, $smsg, $eml) = @_;
                $eml //= PublicInbox::Eml->new($buf);
+               ++$lei->{-nr_seen};
                return if $dedupe->is_dup($eml, $smsg);
                $lse->xsmsg_vmd($smsg) if $lse;
                $smsg->{-recent} = 1 if $set_recent;
@@ -280,8 +281,9 @@ sub _maildir_write_cb ($$) {
        $dedupe->prepare_dedupe if $dedupe;
        my $dst = $lei->{ovv}->{dst};
        my $lse = $lei->{lse}; # may be undef
-       my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef;
-       my $out = $sto ? 'maildir:'.$lei->abs_path($dst) : undef;
+       my $lms = $self->{-lms_rw};
+       my $out = $lms ? 'maildir:'.$lei->abs_path($dst) : undef;
+       $lms->lms_write_prepare if $lms;
 
        # Favor cur/ and only write to new/ when augmenting.  This
        # saves MUAs from having to do a mass rename when the initial
@@ -290,13 +292,15 @@ sub _maildir_write_cb ($$) {
        sub { # for git_to_mail
                my ($bref, $smsg, $eml) = @_;
                $dst // return $lei->fail; # dst may be undef-ed in last run
+
+               ++$lei->{-nr_seen};
                return if $dedupe && $dedupe->is_dup($eml //
                                                PublicInbox::Eml->new($$bref),
                                                $smsg);
                $lse->xsmsg_vmd($smsg) if $lse;
                my $n = _buf2maildir($dst, $bref // \($eml->as_string),
                                        $smsg, $dir);
-               $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
+               $lms->set_src($smsg->oidbin, $out, $n) if $lms;
                ++$lei->{-nr_write};
        }
 }
@@ -311,10 +315,13 @@ sub _imap_write_cb ($$) {
        my $folder = $uri->mailbox;
        $uri->uidvalidity($mic->uidvalidity($folder));
        my $lse = $lei->{lse}; # may be undef
-       my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef;
+       my $lms = $self->{-lms_rw};
+       $lms->lms_write_prepare if $lms;
        sub { # for git_to_mail
                my ($bref, $smsg, $eml) = @_;
                $mic // return $lei->fail; # mic may be undef-ed in last run
+
+               ++$lei->{-nr_seen};
                return if $dedupe && $dedupe->is_dup($eml //
                                                PublicInbox::Eml->new($$bref),
                                                $smsg);
@@ -325,9 +332,8 @@ sub _imap_write_cb ($$) {
                        die $err;
                }
                # imap_append returns UID if IMAP server has UIDPLUS extension
-               ($sto && $uid =~ /\A[0-9]+\z/) and
-                       $sto->wq_do('set_sync_info',
-                                       $smsg->{blob}, $$uri, $uid + 0);
+               ($lms && $uid =~ /\A[0-9]+\z/) and
+                       $lms->set_src($smsg->oidbin, $$uri, $uid + 0);
                ++$lei->{-nr_write};
        }
 }
@@ -359,6 +365,7 @@ sub _v2_write_cb ($$) {
        sub { # for git_to_mail
                my ($bref, $smsg, $eml) = @_;
                $eml //= PublicInbox::Eml->new($bref);
+               ++$lei->{-nr_seen};
                return if $dedupe && $dedupe->is_dup($eml, $smsg);
                $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
                ++$lei->{-nr_write};
@@ -414,11 +421,13 @@ sub new {
                require PublicInbox::LeiViewText;
                $lei->{lvt} = PublicInbox::LeiViewText->new($lei, $fmt);
                $self->{base_type} = 'text';
+               $self->{-wq_nr_workers} = 1; # for pager
                @conflict = qw(mua save);
        } elsif ($fmt eq 'v2') {
                die "--dedupe=oid and v2 are incompatible\n" if
                        ($lei->{opt}->{dedupe}//'') eq 'oid';
                $self->{base_type} = 'v2';
+               $self->{-wq_nr_workers} = 1; # v2 has shards
                $lei->{opt}->{save} = \1;
                $dst = $lei->{ovv}->{dst} = $lei->abs_path($dst);
                @conflict = qw(mua sort);
@@ -751,11 +760,14 @@ sub ipc_atfork_child {
        my ($self) = @_;
        my $lei = $self->{lei};
        $lei->_lei_atfork_child;
-       if (my $lse = $lei->{lse}) {
-               $self->{-lms_ro} = $lse->{-lms_ro} //= $lse->lms;
-       }
        $lei->{auth}->do_auth_atfork($self) if $lei->{auth};
        $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
+       $self->{git} = $self->{lei}->{ale}->git;
+       $SIG{TERM} = sub { # avoid ->DESTROY ordering problems
+               my $git = delete $self->{git};
+               $git->async_wait_all if $git;
+               exit(15 + 128);
+       };
        $self->SUPER::ipc_atfork_child;
 }
 
@@ -775,8 +787,8 @@ sub poke_dst {
 sub write_mail { # via ->wq_io_do
        my ($self, $smsg, $eml) = @_;
        return $self->{wcb}->(undef, $smsg, $eml) if $eml;
-       $smsg->{-lms_ro} = $self->{-lms_ro};
-       $self->{lei}->{ale}->git->cat_async($smsg->{blob}, \&git_to_mail,
+       $smsg->{-lms_rw} = $self->{-lms_rw};
+       $self->{git}->cat_async($smsg->{blob}, \&git_to_mail,
                                [$self->{wcb}, $smsg]);
 }
 
@@ -786,9 +798,10 @@ sub wq_atexit_child {
        my $lei = $self->{lei};
        delete $self->{wcb};
        $lei->{ale}->git->async_wait_all;
-       my $nr = delete($lei->{-nr_write}) or return;
+       my ($nr_w, $nr_s) = delete(@$lei{qw(-nr_write -nr_seen)});
+       $nr_s or return;
        return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p};
-       $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr);
+       $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr_w, $nr_s);
 }
 
 # runs on a 1s timer in lei-daemon