]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei/store: use SOCK_SEQPACKET rather than pipe
authorEric Wong <e@80x24.org>
Sun, 19 Sep 2021 12:50:22 +0000 (12:50 +0000)
committerEric Wong <e@80x24.org>
Sun, 19 Sep 2021 19:52:45 +0000 (19:52 +0000)
This has several advantages:

* no need to use ipc.lock to protect a pipe for non-atomic writes

* ability to pass FDs.  In another commit, this will let us
  simplify lei->sto_done_request and pass newly-created
  sockets to lei/store directly.

disadvantages:

- an extra pipe is required for rare messages over several
  hundred KB, this is probably a non-issue, though

The performance delta is unknown, but I expect shards
(which remain pipes) to be the primary bottleneck IPC-wise
for lei/store.

12 files changed:
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiImport.pm
lib/PublicInbox/LeiImportKw.pm
lib/PublicInbox/LeiIndex.pm
lib/PublicInbox/LeiInput.pm
lib/PublicInbox/LeiNoteEvent.pm
lib/PublicInbox/LeiRemote.pm
lib/PublicInbox/LeiRm.pm
lib/PublicInbox/LeiStore.pm
lib/PublicInbox/LeiTag.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm

index 8b0614f27980f8e4e53f31681562ae25cc83b119..549b855b79abc838d29a39474f56c8c7dbd82283 100644 (file)
@@ -1501,9 +1501,9 @@ sub sto_done_request { # only call this from lei-daemon process (not workers)
        eval {
                if ($sock //= $lei->{sock}) { # issue, async wait
                        $LIVE_SOCK{"$sock"} = $sock;
-                       $lei->{sto}->ipc_do('done', "$sock");
+                       $lei->{sto}->wq_do('done', "$sock");
                } else { # forcibly wait
-                       my $wait = $lei->{sto}->ipc_do('done');
+                       my $wait = $lei->{sto}->wq_do('done');
                }
        };
        $lei->err($@) if $@;
index 9084d771cf521cefd8b64c738f0ceec07020bca5..40530914ead73ba1847f73eeb096db46efa88d65 100644 (file)
@@ -16,7 +16,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
        if (my $all_vmd = $self->{all_vmd}) {
                @$vmd{keys %$all_vmd} = values %$all_vmd;
        }
-       $self->{lei}->{sto}->ipc_do('set_eml', $eml, $vmd, $xoids);
+       $self->{lei}->{sto}->wq_do('set_eml', $eml, $vmd, $xoids);
 }
 
 sub input_mbox_cb { # MboxReader callback
index 402125cf5a43a4b1d1cb0cc0988367313a310907..2863d17f829e7f86e964253b6dc00af4d1bf88b2 100644 (file)
@@ -37,7 +37,7 @@ sub ck_update_kw { # via wq_io_do
        $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
        $self->{verbose} and
                $self->{lei}->qerr('# '.unpack('H*', $oidbin)." => @$kw\n");
-       $self->{sto}->ipc_do('set_eml_vmd', undef, { kw => $kw }, \@docids);
+       $self->{sto}->wq_do('set_eml_vmd', undef, { kw => $kw }, \@docids);
 }
 
 sub ikw_done_wait {
index 1b327a2c1bc81904928e818893c8c929abac0fee..b3f3e1a0c51ae68f7f948754a3243083ca8680d8 100644 (file)
@@ -16,7 +16,7 @@ sub input_eml_cb { # used by input_maildir_cb and input_net_cb
        if (my $all_vmd = $self->{all_vmd}) {
                @$vmd{keys %$all_vmd} = values %$all_vmd;
        }
-       $self->{lei}->{sto}->ipc_do('index_eml_only', $eml, $vmd, $xoids);
+       $self->{lei}->{sto}->wq_do('index_eml_only', $eml, $vmd, $xoids);
 }
 
 sub input_fh { # overrides PublicInbox::LeiInput::input_fh
index fe736981ba6565d111871f6778b11838d1ecf83d..22bedba6e6c3acf2814e23987a0922895e46ab6e 100644 (file)
@@ -378,7 +378,7 @@ sub process_inputs {
        }
        # always commit first, even on error partial work is acceptable for
        # lei <import|tag|convert>
-       my $wait = $self->{lei}->{sto}->ipc_do('done') if $self->{lei}->{sto};
+       my $wait = $self->{lei}->{sto}->wq_do('done') if $self->{lei}->{sto};
        $self->{lei}->fail($err) if $err;
 }
 
index 18313359378d0bc79bc6406b28d4265947214a43..5f692e754b5d7782ffe439fcc04f81467925d743 100644 (file)
@@ -36,18 +36,18 @@ sub eml_event ($$$$) {
        my ($self, $eml, $vmd, $state) = @_;
        my $sto = $self->{lei}->{sto};
        if ($state =~ /\Aimport-(?:rw|ro)\z/) {
-               $sto->ipc_do('set_eml', $eml, $vmd);
+               $sto->wq_do('set_eml', $eml, $vmd);
        } elsif ($state =~ /\Aindex-(?:rw|ro)\z/) {
                my $xoids = $self->{lei}->ale->xoids_for($eml);
-               $sto->ipc_do('index_eml_only', $eml, $vmd, $xoids);
+               $sto->wq_do('index_eml_only', $eml, $vmd, $xoids);
        } elsif ($state =~ /\Atag-(?:rw|ro)\z/) {
                my $docids = [];
                my $c = $self->{lse}->kw_changed($eml, $vmd->{kw}, $docids);
                if (scalar @$docids) { # already in lei/store
-                       $sto->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
+                       $sto->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c;
                } elsif (my $xoids = $self->{lei}->ale->xoids_for($eml)) {
                        # it's in an external, only set kw, here
-                       $sto->ipc_do('set_xvmd', $xoids, $eml, $vmd);
+                       $sto->wq_do('set_xvmd', $xoids, $eml, $vmd);
                } # else { totally unknown: ignore
        } else {
                warn "unknown state: $state (in $self->{lei}->{cfg}->{'-f'})\n";
index 8d4ffed0c03b9bc6f75bc64a784f34165b71c107..346aa6a40132292c69681139ed0e0f89b11955b8 100644 (file)
@@ -28,7 +28,7 @@ sub _each_mboxrd_eml { # callback for MboxReader->mboxrd
        my $xoids = $lei->{ale}->xoids_for($eml, 1);
        my $smsg = bless {}, 'PublicInbox::Smsg';
        if ($lei->{sto} && !$xoids) { # memoize locally
-               my $res = $lei->{sto}->ipc_do('add_eml', $eml);
+               my $res = $lei->{sto}->wq_do('add_eml', $eml);
                $smsg = $res if ref($res) eq ref($smsg);
        }
        $smsg->{blob} //= $xoids ? (keys(%$xoids))[0]
@@ -56,7 +56,7 @@ sub mset {
        my $err = waitpid($pid, 0) == $pid ? undef
                                        : "BUG: waitpid($cmd): $!";
        @$reap = (); # cancel OnDestroy
-       my $wait = $self->{lei}->{sto}->ipc_do('done');
+       my $wait = $self->{lei}->{sto}->wq_do('done');
        die $err if $err;
        $self; # we are the mset (and $ibx, and $self)
 }
index 3371f3ed00e469f66233567ff50c782532514a92..97b1c5c15dc4453205f9e9b6244486a2ba64d70e 100644 (file)
@@ -10,7 +10,7 @@ use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
 
 sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
        my ($self, $eml) = @_;
-       $self->{lei}->{sto}->ipc_do('remove_eml', $eml);
+       $self->{lei}->{sto}->wq_do('remove_eml', $eml);
 }
 
 sub input_mbox_cb { # MboxReader callback
index 08add8f5a05496ab234f5cfad9737903de0c009a..4ec6369974d53724dad337c428730543740dd999 100644 (file)
@@ -552,6 +552,12 @@ sub ipc_atfork_child {
        $self->SUPER::ipc_atfork_child;
 }
 
+sub recv_and_run {
+       my ($self, @args) = @_;
+       local $PublicInbox::DS::in_loop = 0; # waitpid synchronously
+       $self->SUPER::recv_and_run(@args);
+}
+
 sub write_prepare {
        my ($self, $lei) = @_;
        $lei // die 'BUG: $lei not passed';
@@ -560,14 +566,14 @@ sub write_prepare {
                require PublicInbox::PktOp;
                my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair;
                my $dir = $lei->store_path;
-               $self->ipc_lock_init("$dir/ipc.lock");
                substr($dir, -length('/lei/store'), 10, '');
                pipe(my ($r, $w)) or die "pipe: $!";
                $w->autoflush(1);
                # Mail we import into lei are private, so headers filtered out
                # by -mda for public mail are not appropriate
                local @PublicInbox::MDA::BAD_HEADERS = ();
-               $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, {
+               $self->{-wq_no_bcast} = 1;
+               $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
                                        lei => $lei,
                                        -err_wr => $w,
                                        to_close => [ $r, $s2d_op_c->{sock} ],
index c4f5ecffb575168044b216139874e2d8a14c5e2d..9bbf0d79c391a7963f029117781334a7ae37bed5 100644 (file)
@@ -12,7 +12,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh
        my ($self, $eml) = @_;
        if (my $xoids = $self->{lse}->xoids_for($eml) // # tries LeiMailSync
                        $self->{lei}->{ale}->xoids_for($eml)) {
-               $self->{lei}->{sto}->ipc_do('update_xvmd', $xoids, $eml,
+               $self->{lei}->{sto}->wq_do('update_xvmd', $xoids, $eml,
                                                $self->{vmd_mod});
        } else {
                ++$self->{unimported};
index 9f7171fb04bbc89110fa7d6eedde534c6419e426..a419b83f211cc8757f4988e686dab30dd9266191 100644 (file)
@@ -215,14 +215,14 @@ sub update_kw_maybe ($$$$) {
        my $c = $lse->kw_changed($eml, $kw, my $docids = []);
        my $vmd = { kw => $kw };
        if (scalar @$docids) { # already in lei/store
-               $lei->{sto}->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c;
+               $lei->{sto}->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c;
        } elsif (my $xoids = $lei->{ale}->xoids_for($eml)) {
                # it's in an external, only set kw, here
-               $lei->{sto}->ipc_do('set_xvmd', $xoids, $eml, $vmd);
+               $lei->{sto}->wq_do('set_xvmd', $xoids, $eml, $vmd);
        } else { # never-before-seen, import the whole thing
                # XXX this is critical in protecting against accidental
                # data loss without --augment
-               $lei->{sto}->ipc_do('set_eml', $eml, $vmd);
+               $lei->{sto}->wq_do('set_eml', $eml, $vmd);
        }
 }
 
@@ -296,7 +296,7 @@ sub _maildir_write_cb ($$) {
                $lse->xsmsg_vmd($smsg) if $lse;
                my $n = _buf2maildir($dst, $bref // \($eml->as_string),
                                        $smsg, $dir);
-               $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
+               $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
                ++$lei->{-nr_write};
        }
 }
@@ -326,7 +326,7 @@ sub _imap_write_cb ($$) {
                }
                # imap_append returns UID if IMAP server has UIDPLUS extension
                ($sto && $uid =~ /\A[0-9]+\z/) and
-                       $sto->ipc_do('set_sync_info',
+                       $sto->wq_do('set_sync_info',
                                        $smsg->{blob}, $$uri, $uid + 0);
                ++$lei->{-nr_write};
        }
@@ -360,7 +360,7 @@ sub _v2_write_cb ($$) {
                my ($bref, $smsg, $eml) = @_;
                $eml //= PublicInbox::Eml->new($bref);
                return if $dedupe && $dedupe->is_dup($eml, $smsg);
-               $lei->{v2w}->ipc_do('add', $eml); # V2Writable->add
+               $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
                ++$lei->{-nr_write};
        }
 }
@@ -658,9 +658,10 @@ sub _pre_augment_v2 {
        }
        PublicInbox::InboxWritable->new($ibx, @creat);
        $ibx->init_inbox if @creat;
-       my $v2w = $lei->{v2w} = $ibx->importer;
-       $v2w->ipc_lock_init("$dir/ipc.lock");
-       $v2w->ipc_worker_spawn("lei/v2w $dir", $lei->oldset, { lei => $lei });
+       my $v2w = $ibx->importer;
+       $v2w->{-wq_no_bcast} = 1;
+       $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
+       $lei->{v2w} = $v2w;
        return if !$lei->{opt}->{shared};
        my $d = "$lei->{ale}->{git}->{git_dir}/objects";
        my $al = "$dir/git/0.git/objects/info/alternates";
@@ -689,7 +690,7 @@ sub do_augment { # slow, runs in wq worker
 sub post_augment {
        my ($self, $lei, @args) = @_;
        my $wait = $lei->{opt}->{'import-before'} ?
-                       $lei->{sto}->ipc_do('checkpoint', 1) : 0;
+                       $lei->{sto}->wq_do('checkpoint', 1) : 0;
        # _post_augment_mbox
        my $m = $self->can("_post_augment_$self->{base_type}") or return;
        $m->($self, $lei, @args);
@@ -774,6 +775,7 @@ sub write_mail { # via ->wq_io_do
 
 sub wq_atexit_child {
        my ($self) = @_;
+       local $PublicInbox::DS::in_loop = 0; # waitpid synchronously
        my $lei = $self->{lei};
        delete $self->{wcb};
        $lei->{ale}->git->async_wait_all;
index 1d49da3d3a4df1f0ef84bf70687f86523262b87f..4583b0678f3c2e7c5f77f52deb5066c5131c23d0 100644 (file)
@@ -269,7 +269,7 @@ sub each_remote_eml { # callback for MboxReader->mboxrd
        my $xoids = $lei->{ale}->xoids_for($eml, 1);
        my $smsg = bless {}, 'PublicInbox::Smsg';
        if ($self->{import_sto} && !$xoids) {
-               my $res = $self->{import_sto}->ipc_do('add_eml', $eml);
+               my $res = $self->{import_sto}->wq_do('add_eml', $eml);
                if (ref($res) eq ref($smsg)) { # totally new message
                        $smsg = $res;
                        $smsg->{kw} = []; # short-circuit xsmsg_vmd
@@ -369,7 +369,7 @@ sub query_remote_mboxrd {
                @$reap_curl = (); # cancel OnDestroy
                die $err if $err;
                my $nr = $lei->{-nr_remote_eml};
-               my $wait = $lei->{sto}->ipc_do('done') if $nr && $lei->{sto};
+               my $wait = $lei->{sto}->wq_do('done') if $nr && $lei->{sto};
                if ($? == 0) {
                        # don't update if no results, maybe MTA is down
                        $key && $nr and
@@ -413,7 +413,7 @@ sub query_done { # EOF callback for main daemon
                warn "BUG: {sto} missing with --mail-sync";
        }
        $lei->sto_done_request if $lei->{sto};
-       my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
+       my $wait = $lei->{v2w} ? $lei->{v2w}->wq_do('done') : undef;
        $lei->{ovv}->ovv_end($lei);
        my $start_mua;
        if ($l2m) { # close() calls LeiToMail reap_compress