]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: move external vivification to xsearch
authorEric Wong <e@80x24.org>
Sat, 23 Jan 2021 10:27:46 +0000 (10:27 +0000)
committerEric Wong <e@80x24.org>
Sat, 23 Jan 2021 23:45:24 +0000 (23:45 +0000)
This seems like a better place to put it given upcoming
URI support, which starts in this commit.

lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiXSearch.pm
t/lei_xsearch.t

index 7d634b5e947571071709c40cd52824858bd34050..eebf217bb316df88093c4628d189145f2d4b7e5d 100644 (file)
@@ -7,19 +7,6 @@ use strict;
 use v5.10.1;
 use PublicInbox::DS qw(dwaitpid);
 
-sub _vivify_external { # _externals_each callback
-       my ($src, $dir) = @_;
-       if (-f "$dir/ei.lock") {
-               require PublicInbox::ExtSearch;
-               push @$src, PublicInbox::ExtSearch->new($dir);
-       } elsif (-f "$dir/inbox.lock" || -d "$dir/public-inbox") { # v2, v1
-               require PublicInbox::Inbox;
-               push @$src, bless { inboxdir => $dir }, 'PublicInbox::Inbox';
-       } else {
-               warn "W: ignoring $dir, unable to determine type\n";
-       }
-}
-
 # the main "lei q SEARCH_TERMS" method
 sub lei_q {
        my ($self, @argv) = @_;
@@ -27,19 +14,19 @@ sub lei_q {
        require PublicInbox::LeiOverview;
        PublicInbox::Config->json; # preload before forking
        my $opt = $self->{opt};
-       my @srcs; # any number of LeiXSearch || LeiSearch || Inbox
+       my $lxs = $self->{lxs} = PublicInbox::LeiXSearch->new;
+       # any number of LeiXSearch || LeiSearch || Inbox
        if ($opt->{'local'} //= 1) { # --local is enabled by default
                my $sto = $self->_lei_store(1);
-               push @srcs, $sto->search;
+               $lxs->prepare_external($sto->search);
        }
 
-       my $lxs = $self->{lxs} = PublicInbox::LeiXSearch->new;
        # --external is enabled by default, but allow --no-external
        if ($opt->{external} //= 1) {
-               $self->_externals_each(\&_vivify_external, \@srcs);
+               my $cb = $lxs->can('prepare_external');
+               $self->_externals_each($cb, $lxs);
        }
-       my $xj = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs));
-       $xj = 1 if !$opt->{thread};
+       my $xj = $opt->{thread} ? $lxs->locals : ($lxs->remotes + 1);
        my $ovv = PublicInbox::LeiOverview->new($self) or return;
        $self->atfork_prepare_wq($lxs);
        $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset);
@@ -76,7 +63,7 @@ sub lei_q {
        $mset_opt{relevance} //= -2 if $opt->{thread};
        $self->{mset_opt} = \%mset_opt;
        $ovv->ovv_begin($self);
-       $lxs->do_query($self, \@srcs);
+       $lxs->do_query($self);
 }
 
 1;
index 987a9896c113d48218dde9cf11ff0d80ecddc13c..10c2524678c6bbfedb3fbb21582e41ee5e8322d9 100644 (file)
@@ -26,10 +26,6 @@ sub new {
 
 sub attach_external {
        my ($self, $ibxish) = @_; # ibxish = ExtSearch or Inbox
-
-       if (!$ibxish->can('over') || !$ibxish->over) {
-               return push(@{$self->{remotes}}, $ibxish)
-       }
        my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
        my $srch = $ibxish->search or
                return warn("$desc not indexed for Xapian\n");
@@ -59,10 +55,9 @@ sub attach_external {
 }
 
 # returns a list of local inboxes (or count in scalar context)
-sub locals {
-       my %uniq = map {; "$_" => $_ } @{$_[0]->{shard2ibx} // []};
-       values %uniq;
-}
+sub locals { @{$_[0]->{locals} // []} }
+
+sub remotes { @{$_[0]->{remotes} // []} }
 
 # called by PublicInbox::Search::xdb
 sub xdb_shards_flat { @{$_[0]->{shards_flat} // []} }
@@ -148,14 +143,16 @@ sub query_thread_mset { # for --thread
 }
 
 sub query_mset { # non-parallel for non-"--thread" users
-       my ($self, $lei, $srcs) = @_;
+       my ($self, $lei) = @_;
        local $0 = "$0 query_mset";
        my $startq = delete $self->{5};
        my %sig = $lei->atfork_child_wq($self);
        local @SIG{keys %sig} = values %sig;
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
-       $self->attach_external($_) for @$srcs;
+       for my $loc (locals($self)) {
+               attach_external($self, $loc);
+       }
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
        my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
        $dedupe->prepare_dedupe;
@@ -172,6 +169,10 @@ sub query_mset { # non-parallel for non-"--thread" users
        $lei->{ovv}->ovv_atexit_child($lei);
 }
 
+sub query_remote_mboxrd {
+       my ($self, $lei, $uri) = @_;
+}
+
 sub git {
        my ($self) = @_;
        my (%seen, @dirs);
@@ -221,18 +222,17 @@ sub do_post_augment {
 }
 
 sub start_query { # always runs in main (lei-daemon) process
-       my ($self, $io, $lei, $srcs) = @_;
-       my $remotes = $self->{remotes} // [];
+       my ($self, $io, $lei) = @_;
        if ($lei->{opt}->{thread}) {
-               for my $ibxish (@$srcs) {
+               for my $ibxish (locals($self)) {
                        $self->wq_do('query_thread_mset', $io, $lei, $ibxish);
                }
        } else {
-               $self->wq_do('query_mset', $io, $lei, $srcs);
+               $self->wq_do('query_mset', $io, $lei);
        }
        # TODO
-       for my $rmt (@$remotes) {
-               $self->wq_do('query_thread_mbox', $io, $lei, $rmt);
+       for my $uri (remotes($self)) {
+               $self->wq_do('query_remote_mboxrd', $io, $lei, $uri);
        }
        @$io = ();
 }
@@ -259,7 +259,7 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
 }
 
 sub do_query {
-       my ($self, $lei_orig, $srcs) = @_;
+       my ($self, $lei_orig) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
        $io[0] = undef;
        pipe(my $done, $io[0]) or die "pipe $!";
@@ -286,7 +286,7 @@ sub do_query {
                $io[5] = $startq;
                $io[1] = $zpipe->[1] if $zpipe;
        }
-       start_query($self, \@io, $lei, $srcs);
+       start_query($self, \@io, $lei);
        $self->wq_close(1);
        unless ($in_loop) {
                # for the $lei->atfork_child_wq PIPE handler:
@@ -302,4 +302,25 @@ sub ipc_atfork_prepare {
        $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 
+sub prepare_external {
+       my ($self, $loc, $boost) = @_; # n.b. already ordered by boost
+       if (ref $loc) { # already a URI, or PublicInbox::Inbox-like object
+               return push(@{$self->{remotes}}, $loc) if $loc->can('scheme');
+       } elsif ($loc =~ m!\Ahttps?://!) {
+               require URI;
+               return push(@{$self->{remotes}}, URI->new($loc));
+       } elsif (-f "$loc/ei.lock") {
+               require PublicInbox::ExtSearch;
+               $loc = PublicInbox::ExtSearch->new($loc);
+       } elsif (-f "$loc/inbox.lock" || -d "$loc/public-inbox") {
+               require PublicInbox::Inbox; # v2, v1
+               $loc = bless { inboxdir => $loc }, 'PublicInbox::Inbox';
+       } else {
+               warn "W: ignoring $loc, unable to determine type\n";
+               return;
+       }
+       push @{$self->{locals}}, $loc;
+}
+
+
 1;
index 8b03c1f26d8089c9ae52f810ba65bb042fd05eda..f745ea3e2bfba9566a73cf560fd091733b41b383 100644 (file)
@@ -49,7 +49,10 @@ $eidx->eidx_sync({fsync => 0});
 my $es = PublicInbox::ExtSearch->new("$home/eidx");
 my $lxs = PublicInbox::LeiXSearch->new;
 for my $ibxish (shuffle($es, @ibx)) {
-       $lxs->attach_external($ibxish);
+       $lxs->prepare_external($ibxish);
+}
+for my $loc ($lxs->locals) {
+       $lxs->attach_external($loc);
 }
 my $nr = $lxs->xdb->get_doccount;
 my $mset = $lxs->mset('d:19931002..19931003', { limit => $nr });