]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiToMail.pm
ipc: switch wq to use the event loop
[public-inbox.git] / lib / PublicInbox / LeiToMail.pm
index 61b546b5ec8ae13892ba5070666353b845eaba62..1f6c2a3b69da18265cfc45ef9173d341f467eae5 100644 (file)
@@ -12,11 +12,16 @@ use PublicInbox::ProcessPipe;
 use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::LeiDedupe;
 use PublicInbox::OnDestroy;
+use PublicInbox::Git;
+use PublicInbox::GitAsyncCat;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT);
-use PublicInbox::Git;
+
+# struggles with short-lived repos, Gcf2Client makes little sense with lei;
+# but we may use in-process libgit2 in the future.
+$PublicInbox::GitAsyncCat::GCF2C = 0;
 
 my %kw2char = ( # Maildir characters
        draft => 'D',
@@ -323,7 +328,7 @@ sub _buf2maildir {
 sub _maildir_write_cb ($$) {
        my ($self, $lei) = @_;
        my $dedupe = $lei->{dedupe};
-       $dedupe->prepare_dedupe;
+       $dedupe->prepare_dedupe if $dedupe;
        my $dst = $lei->{ovv}->{dst};
        sub { # for git_to_mail
                my ($buf, $smsg, $eml) = @_;
@@ -464,31 +469,21 @@ sub write_mail { # via ->wq_do
        my $wcb = $self->{wcb} //= do { # first message
                my %sig = $lei->atfork_child_wq($self);
                @SIG{keys %sig} = values %sig; # not local
-               $lei->{dedupe}->prepare_dedupe;
                $self->write_cb($lei);
        };
        my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
-       $git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
+       git_async_cat($git, $smsg->{blob}, \&git_to_mail,
+                               [$wcb, $smsg, $not_done]);
 }
 
-# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
-# ordering is unstable at worker exit and may cause segfaults
-sub reap_gits {
+sub wq_atexit_child {
        my ($self) = @_;
        delete $self->{wcb};
        for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
                $git->async_wait_all;
        }
-}
-
-sub DESTROY { delete $_[0]->{wcb} }
-
-sub ipc_atfork_child { # runs after IPC::wq_worker_loop
-       my ($self) = @_;
-       $self->SUPER::ipc_atfork_child;
-       # reap_gits needs to run before $self->DESTROY,
-       # IPC.pm will ensure that.
-       PublicInbox::OnDestroy->new($$, \&reap_gits, $self);
+       $SIG{__WARN__} = 'DEFAULT';
+       $SIG{PIPE} = 'DEFAULT';
 }
 
 1;