]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: query: ensure pager exit is instantaneous
authorEric Wong <e@80x24.org>
Sun, 10 Jan 2021 12:15:11 +0000 (12:15 +0000)
committerEric Wong <e@80x24.org>
Tue, 12 Jan 2021 03:51:42 +0000 (03:51 +0000)
Improve interactivity and user experience by allowing the user
to return to the terminal immediately when the pager is exited
(e.g. hitting the `q' key in less(1)).

This is a massive change which restructures query handling to
allow parallel search when --thread expansion is in use and
offloading to a separate worker when --thread is not in use.

The Xapian query offload changes allow us to reenter the event
loop right away once the search(es) are shipped off to the work
queue workers.

This means the main lei-daemon process can forget the lei(1)
client socket immediately once it's handed off to worker
processes.

We now unblock SIGPIPE in query workers and send an exit(141)
response to the lei(1) client socket to denote SIGPIPE.

This also allows parallelization for users using "lei q" from
multiple terminals.

JSON output is currently broken and will need to be restructured
for more flexibility and fork-safety.

lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiXSearch.pm

index 8a3120c9d47e762efd452c93fa7bb44659e893f4..be5b2f453f213a67351686315bf228d679d5dd2e 100644 (file)
@@ -234,6 +234,9 @@ sub wq_worker_loop ($) {
        my $len = $self->{wq_req_len} // (4096 * 33);
        my ($rec, $sub, @args);
        my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
+       local $SIG{PIPE} = sub {
+               die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
+       };
        until ($self->{-wq_quit}) {
                my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
                my $i = 0;
@@ -242,6 +245,7 @@ sub wq_worker_loop ($) {
                        my $mode = shift(@m);
                        if (open(my $fh, $mode, $fd)) {
                                $self->{$i++} = $fh;
+                               $fh->autoflush(1);
                        } else {
                                die "$$ open($mode$fd) (FD:$i): $!";
                        }
@@ -251,8 +255,10 @@ sub wq_worker_loop ($) {
                        die "thaw error on buffer of size:".length($buf);
                ($sub, @args) = @$rec;
                eval { $self->$sub(@args) };
-               warn "$$ wq_worker: $@" if $@;
-               delete @$self{0, 1, 2};
+               warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+               undef $sub; # quiet SIG{PIPE} handler
+               # need to close explicitly to avoid warnings after SIGPIPE
+               close($_) for (delete(@$self{0..2}));
        }
 }
 
@@ -284,8 +290,8 @@ sub _wq_worker_start ($$) {
                PublicInbox::DS::sig_setmask($oldset);
                my $on_destroy = $self->ipc_atfork_child;
                eval { wq_worker_loop($self) };
-               die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@;
-               exit;
+               warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
+               exit($@ ? 1 : 0);
        } else {
                $self->{-wq_workers}->{$pid} = \undef;
        }
index 170231917ccf167ad7bb46842237181b66d259c6..f8b8cd4a0f62b962bc8f687f38ccb9a5d5a041a3 100644 (file)
@@ -269,6 +269,33 @@ sub fail ($$;$) {
        undef;
 }
 
+# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+sub atfork_child_wq {
+       my ($self, $wq) = @_;
+       $self->{sock} //= $wq->{0};
+       $self->{$_} //= $wq->{$_} for (0..2);
+       my $oldpipe = $SIG{PIPE};
+       (
+               __WARN__ => sub { err($self, @_) },
+               PIPE => sub {
+                       $self->x_it(141);
+                       $oldpipe->() if ref($oldpipe) eq 'CODE';
+               }
+       );
+}
+
+# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq);
+sub atfork_prepare_wq {
+       my ($self, $wq) = @_;
+       if ($wq->wq_workers) {
+               my $ret = bless { %$self }, ref($self);
+               my $in = delete $ret->{0};
+               ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
+       } else {
+               ($self, ($self->{sock} // $self->{0}), @$self{1, 2});
+       }
+}
+
 sub _help ($;$) {
        my ($self, $errmsg) = @_;
        my $cmd = $self->{cmd} // 'COMMAND';
@@ -608,8 +635,8 @@ sub start_pager {
        $self->{1} = $wpager;
        $self->{2} = $wpager if -t $self->{2};
        my $pid = spawn([$pager], $env, $rdr);
-       dwaitpid($pid, undef, $self->{sock});
        $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
+       [ $pid, @$rdr{1, 2} ];
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -675,6 +702,8 @@ sub event_step {
 
 sub noop {}
 
+our $oldset; sub oldset { $oldset }
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
        my ($path, $errno, $nfd) = @_;
@@ -691,7 +720,7 @@ sub lazy_start {
        my @st = stat($path) or die "stat($path): $!";
        my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
        pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
-       my $oldset = PublicInbox::DS::block_signals();
+       local $oldset = PublicInbox::DS::block_signals();
        if ($nfd == 1) {
                require PublicInbox::CmdIPC1;
                $recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
@@ -737,6 +766,7 @@ sub lazy_start {
        };
        my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
        local %SIG = (%SIG, %$sig) if !$sigfd;
+       local $SIG{PIPE} = 'IGNORE';
        if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
                PublicInbox::DS->SetLoopTimeout(5000);
        } else {
index f69dccadbd5b167357e849cc6936f2e8d2477f13..040c284d5d08075f040cac2392d136dcd8fe82c1 100644 (file)
@@ -8,7 +8,7 @@ use v5.10.1;
 use PublicInbox::MID qw($MID_EXTRACT);
 use POSIX qw(strftime);
 use PublicInbox::Address qw(pairs);
-use PublicInbox::Search qw(get_pct);
+use PublicInbox::DS qw(dwaitpid);
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
@@ -61,37 +61,30 @@ sub lei_q {
        my $sto = $self->_lei_store(1);
        my $cfg = $self->_lei_cfg(1);
        my $opt = $self->{opt};
-       my $qstr = join(' ', map {;
-               # Consider spaces in argv to be for phrase search in Xapian.
-               # In other words, the users should need only care about
-               # normal shell quotes and not have to learn Xapian quoting.
-               /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
-       } @argv);
-       $opt->{limit} //= 10000;
-       my $lxs;
        require PublicInbox::LeiDedupe;
        my $dd = PublicInbox::LeiDedupe->new($self);
 
        # --local is enabled by default
-       my @src = $opt->{'local'} ? ($sto->search) : ();
+       # src: LeiXSearch || LeiSearch || Inbox
+       my @srcs = $opt->{'local'} ? ($sto->search) : ();
+       require PublicInbox::LeiXSearch;
+       my $lxs = PublicInbox::LeiXSearch->new;
 
        # --external is enabled by default, but allow --no-external
        if ($opt->{external} // 1) {
-               $self->_externals_each(\&_vivify_external, \@src);
-               # {tid} is not unique between indices, so we have to search
-               # each src individually
-               if (!$opt->{thread}) {
-                       require PublicInbox::LeiXSearch;
-                       my $lxs = PublicInbox::LeiXSearch->new;
-                       # local is always first
-                       $lxs->attach_external($_) for @src;
-                       @src = ($lxs);
-               }
+               $self->_externals_each(\&_vivify_external, \@srcs);
        }
-       my $out = $self->{output} // '-';
+       my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs);
+       $j = 1 if !$opt->{thread};
+       if ($self->{pid}) {
+               $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+                       // $self->wq_workers($j);
+       }
+       my $out = $opt->{output} // '-';
        $out = 'json:/dev/stdout' if $out eq '-';
        my $isatty = -t $self->{1};
-       $self->start_pager if $isatty;
+       # no forking workers after this
+       my $pid_old12 = $self->start_pager if $isatty;
        my $json = substr($out, 0, 5) eq 'json:' ?
                ref(PublicInbox::Config->json)->new : undef;
        if ($json) {
@@ -104,10 +97,14 @@ sub lei_q {
                $json->canonical;
        }
 
-       # src: LeiXSearch || LeiSearch || Inbox
        my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
-       delete $mset_opt{limit} if $opt->{limit} < 0;
        $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
+       $mset_opt{qstr} = join(' ', map {;
+               # Consider spaces in argv to be for phrase search in Xapian.
+               # In other words, the users should need only care about
+               # normal shell quotes and not have to learn Xapian quoting.
+               /\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
+       } @argv);
        if (defined(my $sort = $opt->{'sort'})) {
                if ($sort eq 'relevance') {
                        $mset_opt{relevance} = 1;
@@ -123,59 +120,12 @@ sub lei_q {
        # descending docid order
        $mset_opt{relevance} //= -2 if $opt->{thread};
        # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
-
-       # even w/o pretty, do the equivalent of a --pretty=oneline
-       # output so "lei q SEARCH_TERMS | wc -l" can be useful:
-       my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n";
-       my $buf;
-
-       # we can generate too many records to hold in RAM, so we stream
-       # and fake a JSON array starting here:
-       $self->out('[') if $json;
-       my $emit_cb = sub {
-               my ($smsg) = @_;
-               delete @$smsg{qw(tid num)}; # only makes sense if single src
-               chomp($buf = $json->encode(_smsg_unbless($smsg)));
-       };
-       $dd->prepare_dedupe;
-       for my $src (@src) {
-               my $srch = $src->search;
-               my $over = $src->over;
-               my $smsg_for = $src->can('smsg_for'); # LeiXSearch
-               my $mo = { %mset_opt };
-               my $mset = $srch->mset($qstr, $mo);
-               my $ctx = {};
-               if ($smsg_for) {
-                       for my $it ($mset->items) {
-                               my $smsg = $smsg_for->($srch, $it) or next;
-                               next if $dd->is_smsg_dup($smsg);
-                               $self->out($buf .= $ORS) if defined $buf;
-                               $smsg->{relevance} = get_pct($it);
-                               $emit_cb->($smsg);
-                       }
-               } else { # --thread
-                       my $ids = $srch->mset_to_artnums($mset, $mo);
-                       $ctx->{ids} = $ids;
-                       my $i = 0;
-                       my %n2p = map {
-                               ($ids->[$i++], get_pct($_));
-                       } $mset->items;
-                       undef $mset;
-                       while ($over && $over->expand_thread($ctx)) {
-                               for my $n (@{$ctx->{xids}}) {
-                                       my $t = $over->get_art($n) or next;
-                                       next if $dd->is_smsg_dup($t);
-                                       if (my $p = delete $n2p{$t->{num}}) {
-                                               $t->{relevance} = $p;
-                                       }
-                                       $self->out($buf .= $ORS);
-                                       $emit_cb->($t);
-                               }
-                               @{$ctx->{xids}} = ();
-                       }
-               }
+       $self->{mset_opt} = \%mset_opt;
+       $lxs->do_query($self, \@srcs);
+       if ($pid_old12) {
+               $self->{$_} = $pid_old12->[$_] for (1, 2);
+               dwaitpid($pid_old12->[0], undef, $self->{sock});
        }
-       $self->out($buf .= "]\n"); # done
 }
 
 1;
index b670bc2f9e216363c82fdabca5ccceb5459cba12..a3010efe6927f0bfe41de59301f652dbd5bdd410 100644 (file)
@@ -7,7 +7,8 @@
 package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::LeiSearch);
+use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::Search qw(get_pct);
 
 sub new {
        my ($class) = @_;
@@ -83,4 +84,81 @@ sub recent {
 
 sub over {}
 
+sub _mset_more ($$) {
+       my ($mset, $mo) = @_;
+       my $size = $mset->size;
+       $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+}
+
+sub query_thread_mset { # for --thread
+       my ($self, $lei, $ibxish) = @_;
+       my ($srch, $over) = ($ibxish->search, $ibxish->over);
+       unless ($srch && $over) {
+               my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+               warn "$desc not indexed by Xapian\n";
+               return;
+       }
+       local %SIG = (%SIG, $lei->atfork_child_wq($self));
+       my $mo = { %{$lei->{mset_opt}} };
+       my $mset;
+       do {
+               $mset = $srch->mset($mo->{qstr}, $mo);
+               my $ids = $srch->mset_to_artnums($mset, $mo);
+               my $ctx = { ids => $ids };
+               my $i = 0;
+               my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+               while ($over->expand_thread($ctx)) {
+                       for my $n (@{$ctx->{xids}}) {
+                               my $smsg = $over->get_art($n) or next;
+                               # next if $dd->is_smsg_dup($smsg); TODO
+                               if (my $p = delete $n2p{$smsg->{num}}) {
+                                       $smsg->{relevance} = $p;
+                               }
+                               print { $self->{1} } Dumper($smsg);
+                               # $self->out($buf .= $ORS);
+                               # $emit_cb->($smsg);
+                       }
+                       @{$ctx->{xids}} = ();
+               }
+       } while (_mset_more($mset, $mo));
+}
+
+sub query_mset { # non-parallel for non-"--thread" users
+       my ($self, $lei, $srcs) = @_;
+       my $mo = { %{$lei->{mset_opt}} };
+       my $mset;
+       local %SIG = (%SIG, $lei->atfork_child_wq($self));
+       $self->attach_external($_) for @$srcs;
+       do {
+               $mset = $self->mset($mo->{qstr}, $mo);
+               for my $it ($mset->items) {
+                       my $smsg = smsg_for($self, $it) or next;
+                       # next if $dd->is_smsg_dup($smsg);
+                       $smsg->{relevance} = get_pct($it);
+                       use Data::Dumper;
+                       print { $self->{1} } Dumper($smsg);
+                       # $self->out($buf .= $ORS) if defined $buf;
+                       #$emit_cb->($smsg);
+               }
+       } while (_mset_more($mset, $mo));
+}
+
+sub do_query {
+       my ($self, $lei_orig, $srcs) = @_;
+       my ($lei, @io) = $lei_orig->atfork_prepare_wq($self);
+       $io[1]->autoflush(1);
+       $io[2]->autoflush(1);
+       if ($lei->{opt}->{thread}) {
+               for my $ibxish (@$srcs) {
+                       $self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+               }
+       } else {
+               $self->wq_do('query_mset', @io, $lei, $srcs);
+       }
+       # TODO
+       for my $rmt (@{$self->{remotes} // []}) {
+               $self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+       }
+}
+
 1;