use strict;
use v5.10.1;
use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::Search qw(get_pct);
-use Sys::Syslog qw(syslog);
sub new {
my ($class) = @_;
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
- local %SIG = (%SIG, $lei->atfork_child_wq($self));
+ my %sig = $lei->atfork_child_wq($self);
+ local @SIG{keys %sig} = values %sig;
+
my ($srch, $over) = ($ibxish->search, $ibxish->over);
unless ($srch && $over) {
my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
}
my $mo = { %{$lei->{mset_opt}} };
my $mset;
+ my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ my $dd = $lei->{dd};
+ $dd->prepare_dedupe;
do {
$mset = $srch->mset($mo->{qstr}, $mo);
my $ids = $srch->mset_to_artnums($mset, $mo);
my $ctx = { ids => $ids };
my $i = 0;
- my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+ my %n2item = map { ($ids->[$i++], $_) } $mset->items;
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
- # next if $dd->is_smsg_dup($smsg); TODO
- if (my $p = delete $n2p{$smsg->{num}}) {
- $smsg->{relevance} = $p;
- }
- print { $self->{1} } Dumper($smsg);
+ next if $dd->is_smsg_dup($smsg);
+ my $mitem = delete $n2item{$smsg->{num}};
+ $each_smsg->($smsg, $mitem);
# $self->out($buf .= $ORS);
# $emit_cb->($smsg);
}
@{$ctx->{xids}} = ();
}
} while (_mset_more($mset, $mo));
+ $lei->{ovv}->ovv_atexit_child($lei);
}
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei, $srcs) = @_;
+ my %sig = $lei->atfork_child_wq($self);
+ local @SIG{keys %sig} = values %sig;
my $mo = { %{$lei->{mset_opt}} };
my $mset;
- local %SIG = (%SIG, $lei->atfork_child_wq($self));
$self->attach_external($_) for @$srcs;
+ my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ my $dd = $lei->{dd};
+ $dd->prepare_dedupe;
do {
$mset = $self->mset($mo->{qstr}, $mo);
for my $it ($mset->items) {
my $smsg = smsg_for($self, $it) or next;
- # next if $dd->is_smsg_dup($smsg);
- $smsg->{relevance} = get_pct($it);
- use Data::Dumper;
- print { $self->{1} } Dumper($smsg);
+ next if $dd->is_smsg_dup($smsg);
+ $each_smsg->($smsg, $it);
# $self->out($buf .= $ORS) if defined $buf;
#$emit_cb->($smsg);
}
} while (_mset_more($mset, $mo));
+ $lei->{ovv}->ovv_atexit_child($lei);
+}
+
+sub query_done { # PublicInbox::EOFpipe callback
+ my ($lei) = @_;
+ $lei->{ovv}->ovv_end($lei);
+ $lei->dclose;
}
sub do_query {
my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+
+ pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
+ $io[0] = $qry_done; # don't need stdin
$io[1]->autoflush(1);
$io[2]->autoflush(1);
if ($lei->{opt}->{thread}) {
for my $rmt (@{$self->{remotes} // []}) {
$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
}
+ @io = ();
+ close $qry_done; # fully closed when children are done
+
+ # query_done will run when query_*mset close $qry_done
+ if ($lei_orig->{sock}) { # watch for client premature exit
+ require PublicInbox::EOFpipe;
+ PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
+ $lei_orig->{lxs} = $self;
+ $lei_orig->event_step_init;
+ } else {
+ $self->wq_close;
+ read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+ query_done($lei_orig); # may SIGPIPE
+ }
}
-sub ipc_atfork_child {
+sub ipc_atfork_prepare {
my ($self) = @_;
- $SIG{__WARN__} = sub { syslog('warning', "@_") };
- $self->SUPER::ipc_atfork_child; # PublicInbox::IPC
+ $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
+ $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
}
1;