]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiToMail.pm
lei q: parallelize Maildir and mbox writing
[public-inbox.git] / lib / PublicInbox / LeiToMail.pm
index 5d4b7978de3098b6cbc347d143a4f06c7496273b..8d030227e736cc3df97a273471246736ef7e6445 100644 (file)
@@ -5,6 +5,7 @@
 package PublicInbox::LeiToMail;
 use strict;
 use v5.10.1;
+use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::Lock;
 use PublicInbox::ProcessPipe;
@@ -14,6 +15,7 @@ use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT);
+use PublicInbox::Git;
 
 my %kw2char = ( # Maildir characters
        draft => 'D',
@@ -187,8 +189,9 @@ sub zsfx2cmd ($$$) {
        \@cmd;
 }
 
-sub compress_dst {
-       my ($self, $zsfx, $lei) = @_;
+sub _post_augment_mbox { # open a compressor process
+       my ($self, $lei) = @_;
+       my $zsfx = $self->{zsfx} or return;
        my $cmd = zsfx2cmd($zsfx, undef, $lei);
        pipe(my ($r, $w)) or die "pipe: $!";
        my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
@@ -209,7 +212,9 @@ sub decompress_src ($$$) {
 
 sub dup_src ($) {
        my ($in) = @_;
-       open my $dup, '+>>&', $in or die "dup: $!";
+       # fileno needed because wq_set_recv_modes only used ">&=" for {1}
+       # and Perl blindly trusts that to reject the '+' (readability flag)
+       open my $dup, '+>>&=', fileno($in) or die "dup: $!";
        $dup;
 }
 
@@ -321,11 +326,13 @@ sub new {
        } else {
                die "bad mail --format=$fmt\n";
        }
-       my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst);
+       $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
        $self;
 }
 
-sub _prepare_maildir {
+sub _pre_augment_maildir {} # noop
+
+sub _do_augment_maildir {
        my ($self, $lei) = @_;
        my $dst = $lei->{ovv}->{dst};
        if ($lei->{opt}->{augment}) {
@@ -338,6 +345,11 @@ sub _prepare_maildir {
        } else { # clobber existing Maildir
                _maildir_each_file($dst, \&_unlink);
        }
+}
+
+sub _post_augment_maildir {
+       my ($self, $lei) = @_;
+       my $dst = $lei->{ovv}->{dst};
        for my $x (qw(tmp new cur)) {
                my $d = $dst.$x;
                next if -d $d;
@@ -347,46 +359,95 @@ sub _prepare_maildir {
        }
 }
 
-sub _prepare_mbox {
+sub _pre_augment_mbox {
        my ($self, $lei) = @_;
        my $dst = $lei->{ovv}->{dst};
-       my ($out, $seekable);
-       if ($dst eq '/dev/stdout') {
-               $out = $lei->{1};
-       } else {
+       if ($dst ne '/dev/stdout') {
                my $mode = -p $dst ? '>' : '+>>';
                if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) {
                        $! == ENOENT or die "unlink($dst): $!";
                }
-               open $out, $mode, $dst or die "open($dst): $!";
-               # Perl does SEEK_END even with O_APPEND :<
-               $seekable = seek($out, 0, SEEK_SET);
-               die "seek($dst): $!\n" if !$seekable && $! != ESPIPE;
+               open my $out, $mode, $dst or die "open($dst): $!";
                $lei->{1} = $out;
        }
+       # Perl does SEEK_END even with O_APPEND :<
+       $self->{seekable} = seek($lei->{1}, 0, SEEK_SET);
+       if (!$self->{seekable} && $! != ESPIPE && $dst ne '/dev/stdout') {
+               die "seek($dst): $!\n";
+       }
        state $zsfx_allow = join('|', keys %zsfx2cmd);
-       my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
+       ($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/);
+}
+
+sub _do_augment_mbox {
+       my ($self, $lei) = @_;
+       return if !$lei->{opt}->{augment};
        my $dedupe = $lei->{dedupe};
-       if ($lei->{opt}->{augment}) {
-               die "cannot augment $dst, not seekable\n" if !$seekable;
-               if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
-                       my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
-                                       dup_src($out);
-                       my $fmt = $lei->{ovv}->{fmt};
-                       require PublicInbox::MboxReader;
-                       PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
-               }
-               # maybe some systems don't honor O_APPEND, Perl does this:
-               seek($out, 0, SEEK_END) or die "seek $dst: $!";
-               $dedupe->pause_dedupe if $dedupe;
+       my $dst = $lei->{ovv}->{dst};
+       die "cannot augment $dst, not seekable\n" if !$self->{seekable};
+       my $out = $lei->{1};
+       if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
+               my $zsfx = $self->{zsfx};
+               my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
+                               dup_src($out);
+               my $fmt = $lei->{ovv}->{fmt};
+               require PublicInbox::MboxReader;
+               PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
        }
-       compress_dst($self, $zsfx, $lei) if $zsfx;
+       # maybe some systems don't honor O_APPEND, Perl does this:
+       seek($out, 0, SEEK_END) or die "seek $dst: $!";
+       $dedupe->pause_dedupe if $dedupe;
 }
 
-sub do_prepare {
+sub pre_augment { # fast (1 disk seek), runs in main daemon
        my ($self, $lei) = @_;
-       my $m = "_prepare_$self->{base_type}";
+       # _pre_augment_maildir, _pre_augment_mbox
+       my $m = "_pre_augment_$self->{base_type}";
        $self->$m($lei);
 }
 
+sub do_augment { # slow, runs in wq worker
+       my ($self, $lei) = @_;
+       # _do_augment_maildir, _do_augment_mbox
+       my $m = "_do_augment_$self->{base_type}";
+       $self->$m($lei);
+}
+
+sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
+       my ($self, $lei) = @_;
+       # _post_augment_maildir, _post_augment_mbox
+       my $m = "_post_augment_$self->{base_type}";
+       $self->$m($lei);
+}
+
+sub lock_free {
+       $_[0]->{base_type} =~ /\A(?:maildir|mh|imap|jmap)\z/ ? 1 : 0;
+}
+
+sub write_mail { # via ->wq_do
+       my ($self, $git_dir, $oid, $lei, $kw) = @_;
+       my $wcb = $self->{wcb} //= do { # first message
+               my %sig = $lei->atfork_child_wq($self);
+               @SIG{keys %sig} = values %sig; # not local
+               $lei->{dedupe}->prepare_dedupe;
+               $self->write_cb($lei);
+       };
+       my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
+       $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]);
+}
+
+sub ipc_atfork_prepare {
+       my ($self) = @_;
+       # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
+       $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= >&=]);
+       $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+}
+
+sub DESTROY {
+       my ($self) = @_;
+       for my $pid_git (grep(/\A$$\0/, keys %$self)) {
+               $self->{$pid_git}->async_wait_all;
+       }
+}
+
 1;