]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiToMail.pm
ipc: switch wq to use the event loop
[public-inbox.git] / lib / PublicInbox / LeiToMail.pm
index 43c59da085e0e557df1a180780d5717e8af17a4b..1f6c2a3b69da18265cfc45ef9173d341f467eae5 100644 (file)
@@ -12,11 +12,16 @@ use PublicInbox::ProcessPipe;
 use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::LeiDedupe;
 use PublicInbox::OnDestroy;
+use PublicInbox::Git;
+use PublicInbox::GitAsyncCat;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT);
-use PublicInbox::Git;
+
+# struggles with short-lived repos, Gcf2Client makes little sense with lei;
+# but we may use in-process libgit2 in the future.
+$PublicInbox::GitAsyncCat::GCF2C = 0;
 
 my %kw2char = ( # Maildir characters
        draft => 'D',
@@ -227,9 +232,7 @@ sub decompress_src ($$$) {
 
 sub dup_src ($) {
        my ($in) = @_;
-       # fileno needed because wq_set_recv_modes only used ">&=" for {1}
-       # and Perl blindly trusts that to reject the '+' (readability flag)
-       open my $dup, '+>>&=', fileno($in) or die "dup: $!";
+       open my $dup, '+>>&', $in or die "dup: $!";
        $dup;
 }
 
@@ -325,7 +328,7 @@ sub _buf2maildir {
 sub _maildir_write_cb ($$) {
        my ($self, $lei) = @_;
        my $dedupe = $lei->{dedupe};
-       $dedupe->prepare_dedupe;
+       $dedupe->prepare_dedupe if $dedupe;
        my $dst = $lei->{ovv}->{dst};
        sub { # for git_to_mail
                my ($buf, $smsg, $eml) = @_;
@@ -405,6 +408,7 @@ sub _pre_augment_mbox {
                        $! == ENOENT or die "unlink($dst): $!";
                }
                open my $out, $mode, $dst or die "open($dst): $!";
+               $lei->{old_1} = $lei->{1};
                $lei->{1} = $out;
        }
        # Perl does SEEK_END even with O_APPEND :<
@@ -461,39 +465,25 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
 
 sub write_mail { # via ->wq_do
        my ($self, $git_dir, $smsg, $lei) = @_;
-       my $not_done = delete $self->{4}; # write end of {each_smsg_done}
+       my $not_done = delete $self->{$lei->{each_smsg_not_done}};
        my $wcb = $self->{wcb} //= do { # first message
                my %sig = $lei->atfork_child_wq($self);
                @SIG{keys %sig} = values %sig; # not local
-               $lei->{dedupe}->prepare_dedupe;
                $self->write_cb($lei);
        };
        my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
-       $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
-}
-
-sub ipc_atfork_prepare {
-       my ($self) = @_;
-       # (done_wr, stdout|mbox, stderr, 3: sock, 4: each_smsg_done_wr)
-       $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= >&=]);
-       $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+       git_async_cat($git, $smsg->{blob}, \&git_to_mail,
+                               [$wcb, $smsg, $not_done]);
 }
 
-# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
-# ordering is unstable at worker exit and may cause segfaults
-sub reap_gits {
+sub wq_atexit_child {
        my ($self) = @_;
+       delete $self->{wcb};
        for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
                $git->async_wait_all;
        }
-}
-
-sub ipc_atfork_child { # runs after IPC::wq_worker_loop
-       my ($self) = @_;
-       $self->SUPER::ipc_atfork_child;
-       # reap_gits needs to run before $self->DESTROY,
-       # IPC.pm will ensure that.
-       PublicInbox::OnDestroy->new($$, \&reap_gits, $self);
+       $SIG{__WARN__} = 'DEFAULT';
+       $SIG{PIPE} = 'DEFAULT';
 }
 
 1;