my $mo = { %{$lei->{mset_opt}} };
my $mset;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ my $dd = $lei->{dd};
+ $dd->prepare_dedupe;
do {
$mset = $srch->mset($mo->{qstr}, $mo);
my $ids = $srch->mset_to_artnums($mset, $mo);
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
- # next if $dd->is_smsg_dup($smsg); TODO
+ next if $dd->is_smsg_dup($smsg);
my $mitem = delete $n2item{$smsg->{num}};
$each_smsg->($smsg, $mitem);
# $self->out($buf .= $ORS);
my $mset;
$self->attach_external($_) for @$srcs;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ my $dd = $lei->{dd};
+ $dd->prepare_dedupe;
do {
$mset = $self->mset($mo->{qstr}, $mo);
for my $it ($mset->items) {
my $smsg = smsg_for($self, $it) or next;
- # next if $dd->is_smsg_dup($smsg);
+ next if $dd->is_smsg_dup($smsg);
$each_smsg->($smsg, $it);
# $self->out($buf .= $ORS) if defined $buf;
#$emit_cb->($smsg);
sub do_query {
my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
-
+ my $remotes = $self->{remotes} // [];
pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
$io[0] = $qry_done; # don't need stdin
- $io[1]->autoflush(1);
- $io[2]->autoflush(1);
+
if ($lei->{opt}->{thread}) {
+ $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
for my $ibxish (@$srcs) {
$self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
}
} else {
+ $lei->{-parallel} = scalar(@$remotes);
$self->wq_do('query_mset', \@io, $lei, $srcs);
}
# TODO
- for my $rmt (@{$self->{remotes} // []}) {
+ for my $rmt (@$remotes) {
$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
}
@io = ();