]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiToMail.pm
lei/store: use SOCK_SEQPACKET rather than pipe
[public-inbox.git] / lib / PublicInbox / LeiToMail.pm
index 9f7171fb04bbc89110fa7d6eedde534c6419e426..a419b83f211cc8757f4988e686dab30dd9266191 100644 (file)
@@ -215,14 +215,14 @@ sub update_kw_maybe ($$$$) {
        my $c = $lse->kw_changed($eml, $kw, my $docids = []);
        my $vmd = { kw => $kw };
        if (scalar @$docids) { # already in lei/store
-               $lei->{sto}->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
+               $lei->{sto}->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c;
        } elsif (my $xoids = $lei->{ale}->xoids_for($eml)) {
                # it's in an external, only set kw, here
-               $lei->{sto}->ipc_do('set_xvmd', $xoids, $eml, $vmd);
+               $lei->{sto}->wq_do('set_xvmd', $xoids, $eml, $vmd);
        } else { # never-before-seen, import the whole thing
                # XXX this is critical in protecting against accidental
                # data loss without --augment
-               $lei->{sto}->ipc_do('set_eml', $eml, $vmd);
+               $lei->{sto}->wq_do('set_eml', $eml, $vmd);
        }
 }
 
@@ -296,7 +296,7 @@ sub _maildir_write_cb ($$) {
                $lse->xsmsg_vmd($smsg) if $lse;
                my $n = _buf2maildir($dst, $bref // \($eml->as_string),
                                        $smsg, $dir);
-               $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
+               $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
                ++$lei->{-nr_write};
        }
 }
@@ -326,7 +326,7 @@ sub _imap_write_cb ($$) {
                }
                # imap_append returns UID if IMAP server has UIDPLUS extension
                ($sto && $uid =~ /\A[0-9]+\z/) and
-                       $sto->ipc_do('set_sync_info',
+                       $sto->wq_do('set_sync_info',
                                        $smsg->{blob}, $$uri, $uid + 0);
                ++$lei->{-nr_write};
        }
@@ -360,7 +360,7 @@ sub _v2_write_cb ($$) {
                my ($bref, $smsg, $eml) = @_;
                $eml //= PublicInbox::Eml->new($bref);
                return if $dedupe && $dedupe->is_dup($eml, $smsg);
-               $lei->{v2w}->ipc_do('add', $eml); # V2Writable->add
+               $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
                ++$lei->{-nr_write};
        }
 }
@@ -658,9 +658,10 @@ sub _pre_augment_v2 {
        }
        PublicInbox::InboxWritable->new($ibx, @creat);
        $ibx->init_inbox if @creat;
-       my $v2w = $lei->{v2w} = $ibx->importer;
-       $v2w->ipc_lock_init("$dir/ipc.lock");
-       $v2w->ipc_worker_spawn("lei/v2w $dir", $lei->oldset, { lei => $lei });
+       my $v2w = $ibx->importer;
+       $v2w->{-wq_no_bcast} = 1;
+       $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+       $lei->{v2w} = $v2w;
        return if !$lei->{opt}->{shared};
        my $d = "$lei->{ale}->{git}->{git_dir}/objects";
        my $al = "$dir/git/0.git/objects/info/alternates";
@@ -689,7 +690,7 @@ sub do_augment { # slow, runs in wq worker
 sub post_augment {
        my ($self, $lei, @args) = @_;
        my $wait = $lei->{opt}->{'import-before'} ?
-                       $lei->{sto}->ipc_do('checkpoint', 1) : 0;
+                       $lei->{sto}->wq_do('checkpoint', 1) : 0;
        # _post_augment_mbox
        my $m = $self->can("_post_augment_$self->{base_type}") or return;
        $m->($self, $lei, @args);
@@ -774,6 +775,7 @@ sub write_mail { # via ->wq_io_do
 
 sub wq_atexit_child {
        my ($self) = @_;
+       local $PublicInbox::DS::in_loop = 0; # waitpid synchronously
        my $lei = $self->{lei};
        delete $self->{wcb};
        $lei->{ale}->git->async_wait_all;