]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei q: reinstate smsg dedupe
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index b41727342de7c1e65a95c360ec0bf300c27f705f..80e7a7f76124d74ce36b868a709370459e5c4a4c 100644 (file)
@@ -8,8 +8,6 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::Search qw(get_pct);
-use Sys::Syslog qw(syslog);
 
 sub new {
        my ($class) = @_;
@@ -93,7 +91,9 @@ sub _mset_more ($$) {
 
 sub query_thread_mset { # for --thread
        my ($self, $lei, $ibxish) = @_;
-       local %SIG = (%SIG, $lei->atfork_child_wq($self));
+       my %sig = $lei->atfork_child_wq($self);
+       local @SIG{keys %sig} = values %sig;
+
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
        unless ($srch && $over) {
                my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -102,51 +102,65 @@ sub query_thread_mset { # for --thread
        }
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
+       my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+       my $dd = $lei->{dd};
+       $dd->prepare_dedupe;
        do {
                $mset = $srch->mset($mo->{qstr}, $mo);
                my $ids = $srch->mset_to_artnums($mset, $mo);
                my $ctx = { ids => $ids };
                my $i = 0;
-               my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+               my %n2item = map { ($ids->[$i++], $_) } $mset->items;
                while ($over->expand_thread($ctx)) {
                        for my $n (@{$ctx->{xids}}) {
                                my $smsg = $over->get_art($n) or next;
-                               # next if $dd->is_smsg_dup($smsg); TODO
-                               if (my $p = delete $n2p{$smsg->{num}}) {
-                                       $smsg->{relevance} = $p;
-                               }
-                               print { $self->{1} } Dumper($smsg);
+                               next if $dd->is_smsg_dup($smsg);
+                               my $mitem = delete $n2item{$smsg->{num}};
+                               $each_smsg->($smsg, $mitem);
                                # $self->out($buf .= $ORS);
                                # $emit_cb->($smsg);
                        }
                        @{$ctx->{xids}} = ();
                }
        } while (_mset_more($mset, $mo));
+       $lei->{ovv}->ovv_atexit_child($lei);
 }
 
 sub query_mset { # non-parallel for non-"--thread" users
        my ($self, $lei, $srcs) = @_;
+       my %sig = $lei->atfork_child_wq($self);
+       local @SIG{keys %sig} = values %sig;
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
-       local %SIG = (%SIG, $lei->atfork_child_wq($self));
        $self->attach_external($_) for @$srcs;
+       my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+       my $dd = $lei->{dd};
+       $dd->prepare_dedupe;
        do {
                $mset = $self->mset($mo->{qstr}, $mo);
                for my $it ($mset->items) {
                        my $smsg = smsg_for($self, $it) or next;
-                       # next if $dd->is_smsg_dup($smsg);
-                       $smsg->{relevance} = get_pct($it);
-                       use Data::Dumper;
-                       print { $self->{1} } Dumper($smsg);
+                       next if $dd->is_smsg_dup($smsg);
+                       $each_smsg->($smsg, $it);
                        # $self->out($buf .= $ORS) if defined $buf;
                        #$emit_cb->($smsg);
                }
        } while (_mset_more($mset, $mo));
+       $lei->{ovv}->ovv_atexit_child($lei);
+}
+
+sub query_done { # PublicInbox::EOFpipe callback
+       my ($lei) = @_;
+       $lei->{ovv}->ovv_end($lei);
+       $lei->dclose;
 }
 
 sub do_query {
        my ($self, $lei_orig, $srcs) = @_;
        my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+
+       pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
+       $io[0] = $qry_done; # don't need stdin
        $io[1]->autoflush(1);
        $io[2]->autoflush(1);
        if ($lei->{opt}->{thread}) {
@@ -160,12 +174,26 @@ sub do_query {
        for my $rmt (@{$self->{remotes} // []}) {
                $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
        }
+       @io = ();
+       close $qry_done; # fully closed when children are done
+
+       # query_done will run when query_*mset close $qry_done
+       if ($lei_orig->{sock}) { # watch for client premature exit
+               require PublicInbox::EOFpipe;
+               PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
+               $lei_orig->{lxs} = $self;
+               $lei_orig->event_step_init;
+       } else {
+               $self->wq_close;
+               read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+               query_done($lei_orig); # may SIGPIPE
+       }
 }
 
-sub ipc_atfork_child {
+sub ipc_atfork_prepare {
        my ($self) = @_;
-       $SIG{__WARN__} = sub { syslog('warning', "@_") };
-       $self->SUPER::ipc_atfork_child; # PublicInbox::IPC
+       $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
+       $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 
 1;