]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei: q: lock stdout on overview output
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index 68889e813d89319424ea9907d4b115da8a157652..ee93e074b1cacca62055cff1d507399ea5c224c5 100644 (file)
@@ -103,6 +103,8 @@ sub query_thread_mset { # for --thread
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+       my $dd = $lei->{dd};
+       $dd->prepare_dedupe;
        do {
                $mset = $srch->mset($mo->{qstr}, $mo);
                my $ids = $srch->mset_to_artnums($mset, $mo);
@@ -112,7 +114,7 @@ sub query_thread_mset { # for --thread
                while ($over->expand_thread($ctx)) {
                        for my $n (@{$ctx->{xids}}) {
                                my $smsg = $over->get_art($n) or next;
-                               # next if $dd->is_smsg_dup($smsg); TODO
+                               next if $dd->is_smsg_dup($smsg);
                                my $mitem = delete $n2item{$smsg->{num}};
                                $each_smsg->($smsg, $mitem);
                                # $self->out($buf .= $ORS);
@@ -132,11 +134,13 @@ sub query_mset { # non-parallel for non-"--thread" users
        my $mset;
        $self->attach_external($_) for @$srcs;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+       my $dd = $lei->{dd};
+       $dd->prepare_dedupe;
        do {
                $mset = $self->mset($mo->{qstr}, $mo);
                for my $it ($mset->items) {
                        my $smsg = smsg_for($self, $it) or next;
-                       next if $dd->is_smsg_dup($smsg);
+                       next if $dd->is_smsg_dup($smsg);
                        $each_smsg->($smsg, $it);
                        # $self->out($buf .= $ORS) if defined $buf;
                        #$emit_cb->($smsg);
@@ -154,20 +158,21 @@ sub query_done { # PublicInbox::EOFpipe callback
 sub do_query {
        my ($self, $lei_orig, $srcs) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
-
+       my $remotes = $self->{remotes} // [];
        pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
        $io[0] = $qry_done; # don't need stdin
-       $io[1]->autoflush(1);
-       $io[2]->autoflush(1);
+
        if ($lei->{opt}->{thread}) {
+               $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
                for my $ibxish (@$srcs) {
                        $self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
                }
        } else {
+               $lei->{-parallel} = scalar(@$remotes);
                $self->wq_do('query_mset', \@io, $lei, $srcs);
        }
        # TODO
-       for my $rmt (@{$self->{remotes} // []}) {
+       for my $rmt (@$remotes) {
                $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
        }
        @io = ();