]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei_xsearch: use per-external queries when not sorting
authorEric Wong <e@80x24.org>
Tue, 13 Apr 2021 10:54:42 +0000 (10:54 +0000)
committerEric Wong <e@80x24.org>
Tue, 13 Apr 2021 19:04:41 +0000 (15:04 -0400)
We only need the combined mset query when we care about sort
order.  When writing to --output destinations intended for MUA
consumption, sort order is irrelevant as MUAs are expected to
offer their own sorting, so run queries to each external in
parallel.

This prepares us for docid-sort-based saved search support.

It will also become faster than the combined mset query for
users with many externals due to current Xapian exhibiting poor
performance with many shards (the same reason -extindex exists)

lib/PublicInbox/LeiXSearch.pm

index 692d5e54a7322a79446939cdb0ecd571ee46bd36..9d367977f28600d3c8cf770ba4a10926ec6bc5a0 100644 (file)
@@ -111,7 +111,7 @@ sub _mset_more ($$) {
 }
 
 # $startq will EOF when do_augment is done augmenting and allow
-# query_mset and query_thread_mset to proceed.
+# query_combined_mset and query_thread_mset to proceed.
 sub wait_startq ($) {
        my ($lei) = @_;
        my $startq = delete $lei->{startq} or return;
@@ -144,9 +144,9 @@ sub mset_progress {
        }
 }
 
-sub query_thread_mset { # for --threads
+sub query_one_mset { # for --threads and l2m w/o sort
        my ($self, $ibxish) = @_;
-       local $0 = "$0 query_thread_mset";
+       local $0 = "$0 query_one_mset";
        my $lei = $self->{lei};
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
        my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -155,41 +155,51 @@ sub query_thread_mset { # for --threads
        my $mset;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        my $can_kw = !!$ibxish->can('msg_keywords');
-       my $fl = $lei->{opt}->{threads} > 1 ? 1 : undef;
+       my $threads = $lei->{opt}->{threads} // 0;
+       my $fl = $threads > 1 ? 1 : undef;
        do {
                $mset = $srch->mset($mo->{qstr}, $mo);
                mset_progress($lei, $desc, $mset->size,
                                $mset->get_matches_estimated);
                wait_startq($lei); # wait for keyword updates
                my $ids = $srch->mset_to_artnums($mset, $mo);
-               my $ctx = { ids => $ids };
                my $i = 0;
-               my %n2item = map { ($ids->[$i++], $_) } $mset->items;
-               while ($over->expand_thread($ctx)) {
-                       for my $n (@{$ctx->{xids}}) {
-                               my $smsg = $over->get_art($n) or next;
-                               my $mitem = delete $n2item{$smsg->{num}};
-                               next if $smsg->{bytes} == 0;
-                               if ($mitem) {
-                                       if ($can_kw) {
+               if ($threads) {
+                       my $ctx = { ids => $ids };
+                       my %n2item = map { ($ids->[$i++], $_) } $mset->items;
+                       while ($over->expand_thread($ctx)) {
+                               for my $n (@{$ctx->{xids}}) {
+                                       my $smsg = $over->get_art($n) or next;
+                                       my $mitem = delete $n2item{$n};
+                                       next if $smsg->{bytes} == 0;
+                                       if ($mitem && $can_kw) {
                                                mitem_kw($smsg, $mitem, $fl);
-                                       } elsif ($fl) {
+                                       } elsif ($mitem && $fl) {
                                                # call ->xsmsg_vmd, later
                                                $smsg->{lei_q_tt_flagged} = 1;
                                        }
+                                       $each_smsg->($smsg, $mitem);
                                }
+                               @{$ctx->{xids}} = ();
+                       }
+               } else {
+                       my @items = $mset->items;
+                       for my $n (@$ids) {
+                               my $mitem = $items[$i++];
+                               my $smsg = $over->get_art($n) or next;
+                               next if $smsg->{bytes} == 0;
+                               mitem_kw($smsg, $mitem, $fl) if $can_kw;
                                $each_smsg->($smsg, $mitem);
                        }
-                       @{$ctx->{xids}} = ();
                }
        } while (_mset_more($mset, $mo));
-       undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+       undef $each_smsg; # may commit
        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
-sub query_mset { # non-parallel for non-"--threads" users
+sub query_combined_mset { # non-parallel for non-"--threads" users
        my ($self) = @_;
-       local $0 = "$0 query_mset";
+       local $0 = "$0 query_combined_mset";
        my $lei = $self->{lei};
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
@@ -207,7 +217,7 @@ sub query_mset { # non-parallel for non-"--threads" users
                        $each_smsg->($smsg, $mitem);
                }
        } while (_mset_more($mset, $mo));
-       undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+       undef $each_smsg; # may commit
        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
@@ -379,14 +389,14 @@ sub concurrency {
        $nl + $nr;
 }
 
-sub start_query { # always runs in main (lei-daemon) process
-       my ($self) = @_;
-       if ($self->{threads}) {
+sub start_query ($;$) { # always runs in main (lei-daemon) process
+       my ($self, $l2m) = @_;
+       if ($self->{opt_threads} || ($l2m && !$self->{opt_sort})) {
                for my $ibxish (locals($self)) {
-                       $self->wq_io_do('query_thread_mset', [], $ibxish);
+                       $self->wq_io_do('query_one_mset', [], $ibxish);
                }
        } elsif (locals($self)) {
-               $self->wq_io_do('query_mset', []);
+               $self->wq_io_do('query_combined_mset', []);
        }
        my $i = 0;
        my $q = [];
@@ -402,7 +412,7 @@ sub start_query { # always runs in main (lei-daemon) process
 sub incr_start_query { # called whenever an l2m shard starts do_post_auth
        my ($self, $l2m) = @_;
        return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers};
-       start_query($self);
+       start_query($self, $l2m);
 }
 
 sub ipc_atfork_child {
@@ -448,7 +458,8 @@ sub do_query {
        my $op_c = delete $lei->{pkt_op_c};
        delete $lei->{pkt_op_p};
        @$end = ();
-       $self->{threads} = $lei->{opt}->{threads};
+       $self->{opt_threads} = $lei->{opt}->{threads};
+       $self->{opt_sort} = $lei->{opt}->{'sort'};
        if ($l2m) {
                $l2m->net_merge_complete unless $lei->{auth};
        } else {