my $len = $self->{wq_req_len} // (4096 * 33);
my ($rec, $sub, @args);
my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
+ local $SIG{PIPE} = sub {
+ die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
+ };
until ($self->{-wq_quit}) {
my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
my $i = 0;
my $mode = shift(@m);
if (open(my $fh, $mode, $fd)) {
$self->{$i++} = $fh;
+ $fh->autoflush(1);
} else {
die "$$ open($mode$fd) (FD:$i): $!";
}
die "thaw error on buffer of size:".length($buf);
($sub, @args) = @$rec;
eval { $self->$sub(@args) };
- warn "$$ wq_worker: $@" if $@;
- delete @$self{0, 1, 2};
+ warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+ undef $sub; # quiet SIG{PIPE} handler
+ # need to close explicitly to avoid warnings after SIGPIPE
+ close($_) for (delete(@$self{0..2}));
}
}
PublicInbox::DS::sig_setmask($oldset);
my $on_destroy = $self->ipc_atfork_child;
eval { wq_worker_loop($self) };
- die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@;
- exit;
+ warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
+ exit($@ ? 1 : 0);
} else {
$self->{-wq_workers}->{$pid} = \undef;
}
undef;
}
+# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+sub atfork_child_wq {
+ my ($self, $wq) = @_;
+ $self->{sock} //= $wq->{0};
+ $self->{$_} //= $wq->{$_} for (0..2);
+ my $oldpipe = $SIG{PIPE};
+ (
+ __WARN__ => sub { err($self, @_) },
+ PIPE => sub {
+ $self->x_it(141);
+ $oldpipe->() if ref($oldpipe) eq 'CODE';
+ }
+ );
+}
+
+# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq);
+sub atfork_prepare_wq {
+ my ($self, $wq) = @_;
+ if ($wq->wq_workers) {
+ my $ret = bless { %$self }, ref($self);
+ my $in = delete $ret->{0};
+ ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
+ } else {
+ ($self, ($self->{sock} // $self->{0}), @$self{1, 2});
+ }
+}
+
sub _help ($;$) {
my ($self, $errmsg) = @_;
my $cmd = $self->{cmd} // 'COMMAND';
$self->{1} = $wpager;
$self->{2} = $wpager if -t $self->{2};
my $pid = spawn([$pager], $env, $rdr);
- dwaitpid($pid, undef, $self->{sock});
$env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
+ [ $pid, @$rdr{1, 2} ];
}
sub accept_dispatch { # Listener {post_accept} callback
sub noop {}
+our $oldset; sub oldset { $oldset }
+
# lei(1) calls this when it can't connect
sub lazy_start {
my ($path, $errno, $nfd) = @_;
my @st = stat($path) or die "stat($path): $!";
my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
- my $oldset = PublicInbox::DS::block_signals();
+ local $oldset = PublicInbox::DS::block_signals();
if ($nfd == 1) {
require PublicInbox::CmdIPC1;
$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
};
my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
local %SIG = (%SIG, %$sig) if !$sigfd;
+ local $SIG{PIPE} = 'IGNORE';
if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
PublicInbox::DS->SetLoopTimeout(5000);
} else {
use PublicInbox::MID qw($MID_EXTRACT);
use POSIX qw(strftime);
use PublicInbox::Address qw(pairs);
-use PublicInbox::Search qw(get_pct);
+use PublicInbox::DS qw(dwaitpid);
sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
my $sto = $self->_lei_store(1);
my $cfg = $self->_lei_cfg(1);
my $opt = $self->{opt};
- my $qstr = join(' ', map {;
- # Consider spaces in argv to be for phrase search in Xapian.
- # In other words, the users should need only care about
- # normal shell quotes and not have to learn Xapian quoting.
- /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
- } @argv);
- $opt->{limit} //= 10000;
- my $lxs;
require PublicInbox::LeiDedupe;
my $dd = PublicInbox::LeiDedupe->new($self);
# --local is enabled by default
- my @src = $opt->{'local'} ? ($sto->search) : ();
+ # src: LeiXSearch || LeiSearch || Inbox
+ my @srcs = $opt->{'local'} ? ($sto->search) : ();
+ require PublicInbox::LeiXSearch;
+ my $lxs = PublicInbox::LeiXSearch->new;
# --external is enabled by default, but allow --no-external
if ($opt->{external} // 1) {
- $self->_externals_each(\&_vivify_external, \@src);
- # {tid} is not unique between indices, so we have to search
- # each src individually
- if (!$opt->{thread}) {
- require PublicInbox::LeiXSearch;
- my $lxs = PublicInbox::LeiXSearch->new;
- # local is always first
- $lxs->attach_external($_) for @src;
- @src = ($lxs);
- }
+ $self->_externals_each(\&_vivify_external, \@srcs);
}
- my $out = $self->{output} // '-';
+ my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs);
+ $j = 1 if !$opt->{thread};
+ if ($self->{pid}) {
+ $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+ // $self->wq_workers($j);
+ }
+ my $out = $opt->{output} // '-';
$out = 'json:/dev/stdout' if $out eq '-';
my $isatty = -t $self->{1};
- $self->start_pager if $isatty;
+ # no forking workers after this
+ my $pid_old12 = $self->start_pager if $isatty;
my $json = substr($out, 0, 5) eq 'json:' ?
ref(PublicInbox::Config->json)->new : undef;
if ($json) {
$json->canonical;
}
- # src: LeiXSearch || LeiSearch || Inbox
my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
- delete $mset_opt{limit} if $opt->{limit} < 0;
$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
+ $mset_opt{qstr} = join(' ', map {;
+ # Consider spaces in argv to be for phrase search in Xapian.
+ # In other words, the users should need only care about
+ # normal shell quotes and not have to learn Xapian quoting.
+ /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
+ } @argv);
if (defined(my $sort = $opt->{'sort'})) {
if ($sort eq 'relevance') {
$mset_opt{relevance} = 1;
# descending docid order
$mset_opt{relevance} //= -2 if $opt->{thread};
# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
-
- # even w/o pretty, do the equivalent of a --pretty=oneline
- # output so "lei q SEARCH_TERMS | wc -l" can be useful:
- my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n";
- my $buf;
-
- # we can generate too many records to hold in RAM, so we stream
- # and fake a JSON array starting here:
- $self->out('[') if $json;
- my $emit_cb = sub {
- my ($smsg) = @_;
- delete @$smsg{qw(tid num)}; # only makes sense if single src
- chomp($buf = $json->encode(_smsg_unbless($smsg)));
- };
- $dd->prepare_dedupe;
- for my $src (@src) {
- my $srch = $src->search;
- my $over = $src->over;
- my $smsg_for = $src->can('smsg_for'); # LeiXSearch
- my $mo = { %mset_opt };
- my $mset = $srch->mset($qstr, $mo);
- my $ctx = {};
- if ($smsg_for) {
- for my $it ($mset->items) {
- my $smsg = $smsg_for->($srch, $it) or next;
- next if $dd->is_smsg_dup($smsg);
- $self->out($buf .= $ORS) if defined $buf;
- $smsg->{relevance} = get_pct($it);
- $emit_cb->($smsg);
- }
- } else { # --thread
- my $ids = $srch->mset_to_artnums($mset, $mo);
- $ctx->{ids} = $ids;
- my $i = 0;
- my %n2p = map {
- ($ids->[$i++], get_pct($_));
- } $mset->items;
- undef $mset;
- while ($over && $over->expand_thread($ctx)) {
- for my $n (@{$ctx->{xids}}) {
- my $t = $over->get_art($n) or next;
- next if $dd->is_smsg_dup($t);
- if (my $p = delete $n2p{$t->{num}}) {
- $t->{relevance} = $p;
- }
- $self->out($buf .= $ORS);
- $emit_cb->($t);
- }
- @{$ctx->{xids}} = ();
- }
- }
+ $self->{mset_opt} = \%mset_opt;
+ $lxs->do_query($self, \@srcs);
+ if ($pid_old12) {
+ $self->{$_} = $pid_old12->[$_] for (1, 2);
+ dwaitpid($pid_old12->[0], undef, $self->{sock});
}
- $self->out($buf .= "]\n"); # done
}
1;
package PublicInbox::LeiXSearch;
use strict;
use v5.10.1;
-use parent qw(PublicInbox::LeiSearch);
+use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::Search qw(get_pct);
sub new {
my ($class) = @_;
sub over {}
+sub _mset_more ($$) {
+ my ($mset, $mo) = @_;
+ my $size = $mset->size;
+ $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+}
+
+sub query_thread_mset { # for --thread
+ my ($self, $lei, $ibxish) = @_;
+ my ($srch, $over) = ($ibxish->search, $ibxish->over);
+ unless ($srch && $over) {
+ my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+ warn "$desc not indexed by Xapian\n";
+ return;
+ }
+ local %SIG = (%SIG, $lei->atfork_child_wq($self));
+ my $mo = { %{$lei->{mset_opt}} };
+ my $mset;
+ do {
+ $mset = $srch->mset($mo->{qstr}, $mo);
+ my $ids = $srch->mset_to_artnums($mset, $mo);
+ my $ctx = { ids => $ids };
+ my $i = 0;
+ my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+ while ($over->expand_thread($ctx)) {
+ for my $n (@{$ctx->{xids}}) {
+ my $smsg = $over->get_art($n) or next;
+ # next if $dd->is_smsg_dup($smsg); TODO
+ if (my $p = delete $n2p{$smsg->{num}}) {
+ $smsg->{relevance} = $p;
+ }
+ print { $self->{1} } Dumper($smsg);
+ # $self->out($buf .= $ORS);
+ # $emit_cb->($smsg);
+ }
+ @{$ctx->{xids}} = ();
+ }
+ } while (_mset_more($mset, $mo));
+}
+
+sub query_mset { # non-parallel for non-"--thread" users
+ my ($self, $lei, $srcs) = @_;
+ my $mo = { %{$lei->{mset_opt}} };
+ my $mset;
+ local %SIG = (%SIG, $lei->atfork_child_wq($self));
+ $self->attach_external($_) for @$srcs;
+ do {
+ $mset = $self->mset($mo->{qstr}, $mo);
+ for my $it ($mset->items) {
+ my $smsg = smsg_for($self, $it) or next;
+ # next if $dd->is_smsg_dup($smsg);
+ $smsg->{relevance} = get_pct($it);
+ use Data::Dumper;
+ print { $self->{1} } Dumper($smsg);
+ # $self->out($buf .= $ORS) if defined $buf;
+ #$emit_cb->($smsg);
+ }
+ } while (_mset_more($mset, $mo));
+}
+
+sub do_query {
+ my ($self, $lei_orig, $srcs) = @_;
+ my ($lei, @io) = $lei_orig->atfork_prepare_wq($self);
+ $io[1]->autoflush(1);
+ $io[2]->autoflush(1);
+ if ($lei->{opt}->{thread}) {
+ for my $ibxish (@$srcs) {
+ $self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+ }
+ } else {
+ $self->wq_do('query_mset', @io, $lei, $srcs);
+ }
+ # TODO
+ for my $rmt (@{$self->{remotes} // []}) {
+ $self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+ }
+}
+
1;