X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiQuery.pm;h=040c284d5d08075f040cac2392d136dcd8fe82c1;hb=0c89ebd477d1c7a695a0a0b3023c0d41abe573fa;hp=f69dccadbd5b167357e849cc6936f2e8d2477f13;hpb=372ff2ba6467e8fcea3eb19e5527a5fc398802f9;p=public-inbox.git diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index f69dccad..040c284d 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -8,7 +8,7 @@ use v5.10.1; use PublicInbox::MID qw($MID_EXTRACT); use POSIX qw(strftime); use PublicInbox::Address qw(pairs); -use PublicInbox::Search qw(get_pct); +use PublicInbox::DS qw(dwaitpid); sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) } @@ -61,37 +61,30 @@ sub lei_q { my $sto = $self->_lei_store(1); my $cfg = $self->_lei_cfg(1); my $opt = $self->{opt}; - my $qstr = join(' ', map {; - # Consider spaces in argv to be for phrase search in Xapian. - # In other words, the users should need only care about - # normal shell quotes and not have to learn Xapian quoting. - /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_ - } @argv); - $opt->{limit} //= 10000; - my $lxs; require PublicInbox::LeiDedupe; my $dd = PublicInbox::LeiDedupe->new($self); # --local is enabled by default - my @src = $opt->{'local'} ? ($sto->search) : (); + # src: LeiXSearch || LeiSearch || Inbox + my @srcs = $opt->{'local'} ? ($sto->search) : (); + require PublicInbox::LeiXSearch; + my $lxs = PublicInbox::LeiXSearch->new; # --external is enabled by default, but allow --no-external if ($opt->{external} // 1) { - $self->_externals_each(\&_vivify_external, \@src); - # {tid} is not unique between indices, so we have to search - # each src individually - if (!$opt->{thread}) { - require PublicInbox::LeiXSearch; - my $lxs = PublicInbox::LeiXSearch->new; - # local is always first - $lxs->attach_external($_) for @src; - @src = ($lxs); - } + $self->_externals_each(\&_vivify_external, \@srcs); } - my $out = $self->{output} // '-'; + my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs); + $j = 1 if !$opt->{thread}; + if ($self->{pid}) { + $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) + // $self->wq_workers($j); + } + my $out = $opt->{output} // '-'; $out = 'json:/dev/stdout' if $out eq '-'; my $isatty = -t $self->{1}; - $self->start_pager if $isatty; + # no forking workers after this + my $pid_old12 = $self->start_pager if $isatty; my $json = substr($out, 0, 5) eq 'json:' ? ref(PublicInbox::Config->json)->new : undef; if ($json) { @@ -104,10 +97,14 @@ sub lei_q { $json->canonical; } - # src: LeiXSearch || LeiSearch || Inbox my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset); - delete $mset_opt{limit} if $opt->{limit} < 0; $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0; + $mset_opt{qstr} = join(' ', map {; + # Consider spaces in argv to be for phrase search in Xapian. + # In other words, the users should need only care about + # normal shell quotes and not have to learn Xapian quoting. + /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_ + } @argv); if (defined(my $sort = $opt->{'sort'})) { if ($sort eq 'relevance') { $mset_opt{relevance} = 1; @@ -123,59 +120,12 @@ sub lei_q { # descending docid order $mset_opt{relevance} //= -2 if $opt->{thread}; # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self); - - # even w/o pretty, do the equivalent of a --pretty=oneline - # output so "lei q SEARCH_TERMS | wc -l" can be useful: - my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n"; - my $buf; - - # we can generate too many records to hold in RAM, so we stream - # and fake a JSON array starting here: - $self->out('[') if $json; - my $emit_cb = sub { - my ($smsg) = @_; - delete @$smsg{qw(tid num)}; # only makes sense if single src - chomp($buf = $json->encode(_smsg_unbless($smsg))); - }; - $dd->prepare_dedupe; - for my $src (@src) { - my $srch = $src->search; - my $over = $src->over; - my $smsg_for = $src->can('smsg_for'); # LeiXSearch - my $mo = { %mset_opt }; - my $mset = $srch->mset($qstr, $mo); - my $ctx = {}; - if ($smsg_for) { - for my $it ($mset->items) { - my $smsg = $smsg_for->($srch, $it) or next; - next if $dd->is_smsg_dup($smsg); - $self->out($buf .= $ORS) if defined $buf; - $smsg->{relevance} = get_pct($it); - $emit_cb->($smsg); - } - } else { # --thread - my $ids = $srch->mset_to_artnums($mset, $mo); - $ctx->{ids} = $ids; - my $i = 0; - my %n2p = map { - ($ids->[$i++], get_pct($_)); - } $mset->items; - undef $mset; - while ($over && $over->expand_thread($ctx)) { - for my $n (@{$ctx->{xids}}) { - my $t = $over->get_art($n) or next; - next if $dd->is_smsg_dup($t); - if (my $p = delete $n2p{$t->{num}}) { - $t->{relevance} = $p; - } - $self->out($buf .= $ORS); - $emit_cb->($t); - } - @{$ctx->{xids}} = (); - } - } + $self->{mset_opt} = \%mset_opt; + $lxs->do_query($self, \@srcs); + if ($pid_old12) { + $self->{$_} = $pid_old12->[$_] for (1, 2); + dwaitpid($pid_old12->[0], undef, $self->{sock}); } - $self->out($buf .= "]\n"); # done } 1;