]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiToMail.pm
ipc: wq_do => wq_io_do
[public-inbox.git] / lib / PublicInbox / LeiToMail.pm
index c704dc2a111399c19585f7e1be7b4ef7d5b5cedc..3f65e9e99f5d1f9ac0a2250adb31a934d9f19532 100644 (file)
@@ -211,10 +211,10 @@ sub zsfx2cmd ($$$) {
 }
 
 sub _post_augment_mbox { # open a compressor process
-       my ($self, $lei, $zpipe) = @_;
+       my ($self, $lei) = @_;
        my $zsfx = $self->{zsfx} or return;
        my $cmd = zsfx2cmd($zsfx, undef, $lei);
-       my ($r, $w) = splice(@$zpipe, 0, 2);
+       my ($r, $w) = @{delete $lei->{zpipe}};
        my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
        my $pid = spawn($cmd, $lei->{env}, $rdr);
        my $pp = gensym;
@@ -365,6 +365,7 @@ sub new {
        } else {
                die "bad mail --format=$fmt\n";
        }
+       $self->{dst} = $dst;
        $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
        $self;
 }
@@ -407,7 +408,7 @@ sub _pre_augment_mbox {
                        $! == ENOENT or die "unlink($dst): $!";
                }
                open my $out, $mode, $dst or die "open($dst): $!";
-               $lei->{old_1} = $lei->{1};
+               $lei->{old_1} = $lei->{1}; # keep for spawning MUA
                $lei->{1} = $out;
        }
        # Perl does SEEK_END even with O_APPEND :<
@@ -418,7 +419,7 @@ sub _pre_augment_mbox {
        state $zsfx_allow = join('|', keys %zsfx2cmd);
        ($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/) or return;
        pipe(my ($r, $w)) or die "pipe: $!";
-       [ $r, $w ];
+       $lei->{zpipe} = [ $r, $w ];
 }
 
 sub _do_augment_mbox {
@@ -462,16 +463,36 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
        $self->$m($lei, @args);
 }
 
-sub write_mail { # via ->wq_do
-       my ($self, $git_dir, $smsg, $lei) = @_;
-       my $not_done = delete $self->{0} // die 'BUG: $not_done missing';
-       my $wcb = $self->{wcb} //= do { # first message
-               $lei->atfork_child_wq($self);
-               $self->write_cb($lei);
-       };
+sub ipc_atfork_child {
+       my ($self) = @_;
+       my $lei = delete $self->{lei};
+       $lei->lei_atfork_child;
+       if (my $zpipe = delete $lei->{zpipe}) {
+               $lei->{1} = $zpipe->[1];
+               close $zpipe->[0];
+       }
+       $self->{wcb} = $self->write_cb($lei);
+       $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
+       $self->SUPER::ipc_atfork_child;
+}
+
+sub lock_free {
+       $_[0]->{base_type} =~ /\A(?:maildir|mh|imap|jmap)\z/ ? 1 : 0;
+}
+
+sub poke_dst {
+       my ($self) = @_;
+       if ($self->{base_type} eq 'maildir') {
+               my $t = time + 1;
+               utime($t, $t, "$self->{dst}/cur");
+       }
+}
+
+sub write_mail { # via ->wq_io_do
+       my ($self, $git_dir, $smsg) = @_;
        my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
        git_async_cat($git, $smsg->{blob}, \&git_to_mail,
-                               [$wcb, $smsg, $not_done]);
+                               [$self->{wcb}, $smsg]);
 }
 
 sub wq_atexit_child {