package PublicInbox::LeiXSearch;
use strict;
use v5.10.1;
-use parent qw(PublicInbox::LeiSearch);
+use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use Sys::Syslog qw(syslog);
sub new {
my ($class) = @_;
sub attach_external {
my ($self, $ibxish) = @_; # ibxish = ExtSearch or Inbox
- if (!$ibxish->can('over')) {
- push @{$self->{remotes}}, $ibxish
+
+ if (!$ibxish->can('over') || !$ibxish->over) {
+ return push(@{$self->{remotes}}, $ibxish)
}
+ my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+ my $srch = $ibxish->search or
+ return warn("$desc not indexed for Xapian\n");
+ my @shards = $srch->xdb_shards_flat or
+ return warn("$desc has no Xapian shardsXapian\n");
+
if (delete $self->{xdb}) { # XXX: do we need this?
# clobber existing {xdb} if amending
my $expect = delete $self->{nshard};
$nr == $expect or die
"BUG: reloaded $nr shards, expected $expect"
}
- my @shards = $ibxish->search->xdb_shards_flat;
push @{$self->{shards_flat}}, @shards;
push(@{$self->{shard2ibx}}, $ibxish) for (@shards);
}
+# returns a list of local inboxes (or count in scalar context)
+sub locals {
+ my %uniq = map {; "$_" => $_ } @{$_[0]->{shard2ibx} // []};
+ values %uniq;
+}
+
# called by PublicInbox::Search::xdb
-sub xdb_shards_flat { @{$_[0]->{shards_flat}} }
+sub xdb_shards_flat { @{$_[0]->{shards_flat} // []} }
# like over->get_art
sub smsg_for {
$self->mset($qstr //= 'bytes:1..', $opt);
}
+sub over {}
+
+sub _mset_more ($$) {
+ my ($mset, $mo) = @_;
+ my $size = $mset->size;
+ $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+}
+
+sub query_thread_mset { # for --thread
+ my ($self, $lei, $ibxish) = @_;
+ local %SIG = (%SIG, $lei->atfork_child_wq($self));
+ my ($srch, $over) = ($ibxish->search, $ibxish->over);
+ unless ($srch && $over) {
+ my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+ warn "$desc not indexed by Xapian\n";
+ return;
+ }
+ my $mo = { %{$lei->{mset_opt}} };
+ my $mset;
+ my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ do {
+ $mset = $srch->mset($mo->{qstr}, $mo);
+ my $ids = $srch->mset_to_artnums($mset, $mo);
+ my $ctx = { ids => $ids };
+ my $i = 0;
+ my %n2item = map { ($ids->[$i++], $_) } $mset->items;
+ while ($over->expand_thread($ctx)) {
+ for my $n (@{$ctx->{xids}}) {
+ my $smsg = $over->get_art($n) or next;
+ # next if $dd->is_smsg_dup($smsg); TODO
+ my $mitem = delete $n2item{$smsg->{num}};
+ $each_smsg->($smsg, $mitem);
+ # $self->out($buf .= $ORS);
+ # $emit_cb->($smsg);
+ }
+ @{$ctx->{xids}} = ();
+ }
+ } while (_mset_more($mset, $mo));
+ $lei->{ovv}->ovv_atexit_child($lei);
+}
+
+sub query_mset { # non-parallel for non-"--thread" users
+ my ($self, $lei, $srcs) = @_;
+ my $mo = { %{$lei->{mset_opt}} };
+ my $mset;
+ local %SIG = (%SIG, $lei->atfork_child_wq($self));
+ $self->attach_external($_) for @$srcs;
+ my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ do {
+ $mset = $self->mset($mo->{qstr}, $mo);
+ for my $it ($mset->items) {
+ my $smsg = smsg_for($self, $it) or next;
+ # next if $dd->is_smsg_dup($smsg);
+ $each_smsg->($smsg, $it);
+ # $self->out($buf .= $ORS) if defined $buf;
+ #$emit_cb->($smsg);
+ }
+ } while (_mset_more($mset, $mo));
+ $lei->{ovv}->ovv_atexit_child($lei);
+}
+
+sub do_query {
+ my ($self, $lei_orig, $qry_done, $srcs) = @_;
+ my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+ $io[0] = $qry_done; # don't need stdin
+ $io[1]->autoflush(1);
+ $io[2]->autoflush(1);
+ if ($lei->{opt}->{thread}) {
+ for my $ibxish (@$srcs) {
+ $self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
+ }
+ } else {
+ $self->wq_do('query_mset', \@io, $lei, $srcs);
+ }
+ # TODO
+ for my $rmt (@{$self->{remotes} // []}) {
+ $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
+ }
+
+ # sent off to children, they will drop remaining references to it
+ close $qry_done;
+}
+
+sub ipc_atfork_child {
+ my ($self) = @_;
+ $SIG{__WARN__} = sub { syslog('warning', "@_") };
+ $self->SUPER::ipc_atfork_child; # PublicInbox::IPC
+}
+
+sub ipc_atfork_prepare {
+ my ($self) = @_;
+ $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
+ $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+}
+
1;