]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei q: start ->mset while query_prepare runs
authorEric Wong <e@80x24.org>
Tue, 19 Jan 2021 09:34:27 +0000 (09:34 +0000)
committerEric Wong <e@80x24.org>
Thu, 21 Jan 2021 03:29:06 +0000 (03:29 +0000)
We don't need the result of query_prepare (for augmenting or
mass unlinking) until we're ready to deduplicate and write
results to the filesystem.  This ought to let us hide some of
the cost of Xapian searches on multi-device/core systems for
extremely expensive searches.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Spawn.pm

index 6b6ee0f5e8edb8256c08a6674e196fab8c1690c5..4b1dc673bb6123f19f40f374877ec35d6f0a232f 100644 (file)
@@ -293,7 +293,7 @@ sub atfork_child_wq {
        my ($sock, $l2m_wq_s1);
        (@$self{qw(0 1 2)}, $sock, $l2m_wq_s1) = delete(@$wq{0..4});
        $self->{sock} = $sock if -S $sock;
-       $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1;
+       $self->{l2m}->{-wq_s1} = $l2m_wq_s1 if $l2m_wq_s1 && -S $l2m_wq_s1;
        %PATH2CFG = ();
        $quit = \&CORE::exit;
        @TO_CLOSE_ATFORK_CHILD = ();
index dcf6d8a3fce455381d8be7673752950ad04460a8..a1dce550230ef9aac2355ab10a3a7c8148b2a89e 100644 (file)
@@ -440,6 +440,7 @@ sub lock_free {
 
 sub write_mail { # via ->wq_do
        my ($self, $git_dir, $oid, $lei, $kw) = @_;
+       my $not_done = delete $self->{4}; # write end of {each_smsg_done}
        my $wcb = $self->{wcb} //= do { # first message
                my %sig = $lei->atfork_child_wq($self);
                @SIG{keys %sig} = values %sig; # not local
@@ -447,7 +448,7 @@ sub write_mail { # via ->wq_do
                $self->write_cb($lei);
        };
        my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
-       $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw ]);
+       $git->cat_async($oid, \&git_to_mail, [ $wcb, $kw, $not_done ]);
 }
 
 sub ipc_atfork_prepare {
index dc5cf3b65f453cb0eeabe297cc5351e8c67cb7cf..73fd17f49ba7b8dc671a4e8951949088f4826433 100644 (file)
@@ -94,8 +94,17 @@ sub _mset_more ($$) {
        $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
 }
 
+# $startq will EOF when query_prepare is done augmenting and allow
+# query_mset and query_thread_mset to proceed.
+sub wait_startq ($) {
+       my ($startq) = @_;
+       $_[0] = undef;
+       read($startq, my $query_prepare_done, 1);
+}
+
 sub query_thread_mset { # for --thread
        my ($self, $lei, $ibxish) = @_;
+       my $startq = delete $self->{5};
        my %sig = $lei->atfork_child_wq($self);
        local @SIG{keys %sig} = values %sig;
 
@@ -119,6 +128,7 @@ sub query_thread_mset { # for --thread
                while ($over->expand_thread($ctx)) {
                        for my $n (@{$ctx->{xids}}) {
                                my $smsg = $over->get_art($n) or next;
+                               wait_startq($startq) if $startq;
                                next if $dedupe->is_smsg_dup($smsg);
                                my $mitem = delete $n2item{$smsg->{num}};
                                $each_smsg->($smsg, $mitem);
@@ -132,6 +142,7 @@ sub query_thread_mset { # for --thread
 
 sub query_mset { # non-parallel for non-"--thread" users
        my ($self, $lei, $srcs) = @_;
+       my $startq = delete $self->{5};
        my %sig = $lei->atfork_child_wq($self);
        local @SIG{keys %sig} = values %sig;
        my $mo = { %{$lei->{mset_opt}} };
@@ -144,6 +155,7 @@ sub query_mset { # non-parallel for non-"--thread" users
                $mset = $self->mset($mo->{qstr}, $mo);
                for my $it ($mset->items) {
                        my $smsg = smsg_for($self, $it) or next;
+                       wait_startq($startq) if $startq;
                        next if $dedupe->is_smsg_dup($smsg);
                        $each_smsg->($smsg, $it);
                }
@@ -207,47 +219,42 @@ sub start_query { # always runs in main (lei-daemon) process
        @$io = ();
 }
 
-sub query_prepare { # wq_do
+sub query_prepare { # for wq_do,
        my ($self, $lei) = @_;
        my %sig = $lei->atfork_child_wq($self);
        local @SIG{keys %sig} = values %sig;
-       if (my $l2m = $lei->{l2m}) {
-               eval { $l2m->do_augment($lei) };
-               return $lei->fail($@) if $@;
-       }
-       # trigger PublicInbox::OpPipe->event_step
-       my $qry_status_wr = $lei->{0} or
-               return $lei->fail('BUG: qry_status_wr missing');
-       $qry_status_wr->autoflush(1);
-       print $qry_status_wr '.' or # this should never fail...
-               return $lei->fail("BUG? print qry_status_wr: $!");
+       eval { $lei->{l2m}->do_augment($lei) };
+       $lei->fail($@) if $@;
 }
 
 sub do_query {
        my ($self, $lei_orig, $srcs) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
        $io[0] = undef;
-       pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
+       pipe(my $done, $io[0]) or die "pipe $!";
 
        $lei_orig->event_step_init; # wait for shutdowns
-       my $op_map = { '' => [ \&query_done, $self, $lei_orig ] };
+       my $done_op = { '' => [ \&query_done, $self, $lei_orig ] };
        my $in_loop = exists $lei_orig->{sock};
-       my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+       $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
        my $l2m = $lei->{l2m};
        if ($l2m) {
                $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
                $io[1] = $lei_orig->{1};
-               $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
-               $self->wq_do('query_prepare', \@io, $lei);
-               $opp->event_step if !$in_loop;
-       } else {
-               start_query($self, \@io, $lei, $srcs);
+               my @l2m_io = (undef, @io[1..$#io]);
+               pipe(my $startq, $l2m_io[0]) or die "pipe: $!";
+               $self->wq_do('query_prepare', \@l2m_io, $lei);
+               $io[4] //= *STDERR{GLOB};
+               die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
+               fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
+               $io[5] = $startq;
        }
+       start_query($self, \@io, $lei, $srcs);
        unless ($in_loop) {
                my @pids = $self->wq_close;
                # for the $lei->atfork_child_wq PIPE handler:
-               $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
-               $opp->event_step;
+               $done_op->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+               $done->event_step;
                my $ipc_worker_reap = $self->can('ipc_worker_reap');
                if (my $l2m_pids = delete $self->{l2m_pids}) {
                        dwaitpid($_, $ipc_worker_reap, $l2m) for @$l2m_pids;
@@ -258,8 +265,9 @@ sub do_query {
 
 sub ipc_atfork_prepare {
        my ($self) = @_;
-       # (qry_status_wr, stdout|mbox, stderr, 3: sock, 4: $l2m->{-wq_s1})
-       $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&=]);
+       # (0: qry_status_wr, 1: stdout|mbox, 2: stderr,
+       #  3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
+       $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]);
        $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 
index b03f2d59abca5c2f921abdda68ce046a16505c52..376d219096e7d66d3e781f3e3cf1030c2791ed54 100644 (file)
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 5
+#define SEND_FD_CAPA 6
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
        struct cmsghdr hdr;