]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei q: emit progress and counting via PktOp
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index 68be8ada5045508efe16ec4b36ea1971f217f863..95862306cd7a98beba3aca94659f17bd7bf3460a 100644 (file)
@@ -8,14 +8,15 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::DS qw(dwaitpid);
-use PublicInbox::OpPipe;
+use PublicInbox::DS qw(dwaitpid now);
+use PublicInbox::PktOp qw(pkt_do);
 use PublicInbox::Import;
 use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
 use PublicInbox::Search qw(xap_terms);
-use PublicInbox::Spawn qw(popen_rd spawn);
+use PublicInbox::Spawn qw(popen_rd spawn which);
 use PublicInbox::MID qw(mids);
+use PublicInbox::Smsg;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
 
 sub new {
@@ -33,7 +34,7 @@ sub attach_external {
        my $srch = $ibxish->search or
                return warn("$desc not indexed for Xapian\n");
        my @shards = $srch->xdb_shards_flat or
-               return warn("$desc has no Xapian shardsXapian\n");
+               return warn("$desc has no Xapian shards\n");
 
        if (delete $self->{xdb}) { # XXX: do we need this?
                # clobber existing {xdb} if amending
@@ -95,7 +96,7 @@ sub over {}
 sub _mset_more ($$) {
        my ($mset, $mo) = @_;
        my $size = $mset->size;
-       $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+       $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
 }
 
 # $startq will EOF when query_prepare is done augmenting and allow
@@ -109,21 +110,19 @@ sub wait_startq ($) {
 sub query_thread_mset { # for --thread
        my ($self, $lei, $ibxish) = @_;
        local $0 = "$0 query_thread_mset";
-       my $startq = delete $self->{5};
-       my %sig = $lei->atfork_child_wq($self);
-       local @SIG{keys %sig} = values %sig;
+       $lei->atfork_child_wq($self);
+       my $startq = delete $lei->{startq};
 
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
-       unless ($srch && $over) {
-               my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
-               warn "$desc not indexed by Xapian\n";
-               return;
-       }
+       my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+       return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
        do {
                $mset = $srch->mset($mo->{qstr}, $mo);
+               pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size,
+                               $mset->get_matches_estimated);
                my $ids = $srch->mset_to_artnums($mset, $mo);
                my $ctx = { ids => $ids };
                my $i = 0;
@@ -145,9 +144,8 @@ sub query_thread_mset { # for --thread
 sub query_mset { # non-parallel for non-"--thread" users
        my ($self, $lei) = @_;
        local $0 = "$0 query_mset";
-       my $startq = delete $self->{5};
-       my %sig = $lei->atfork_child_wq($self);
-       local @SIG{keys %sig} = values %sig;
+       $lei->atfork_child_wq($self);
+       my $startq = delete $lei->{startq};
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
        for my $loc (locals($self)) {
@@ -156,6 +154,8 @@ sub query_mset { # non-parallel for non-"--thread" users
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
        do {
                $mset = $self->mset($mo->{qstr}, $mo);
+               pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch',
+                               $mset->size, $mset->get_matches_estimated);
                for my $mitem ($mset->items) {
                        my $smsg = smsg_for($self, $mitem) or next;
                        wait_startq($startq) if $startq;
@@ -173,7 +173,17 @@ sub each_eml { # callback for MboxReader->mboxrd
        $smsg->parse_references($eml, mids($eml));
        $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
        delete @$smsg{qw(From Subject -ds -ts)};
-       if (my $startq = delete($self->{5})) { wait_startq($startq) }
+       if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+       ++$lei->{-nr_remote_eml};
+       if (!$lei->{opt}->{quiet}) {
+               my $now = now();
+               my $next = $lei->{-next_progress} //= ($now + 1);
+               if ($now > $next) {
+                       $lei->{-next_progress} = $now + 1;
+                       my $nr = $lei->{-nr_remote_eml};
+                       $lei->err("# $lei->{-current_url} $nr/?");
+               }
+       }
        $each_smsg->($smsg, undef, $eml);
 }
 
@@ -187,12 +197,11 @@ sub kill_reap {
 sub query_remote_mboxrd {
        my ($self, $lei, $uris) = @_;
        local $0 = "$0 query_remote_mboxrd";
-       my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq
-       local @SIG{keys %sig} = values %sig;
+       $lei->atfork_child_wq($self);
        my ($opt, $env) = @$lei{qw(opt env)};
        my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
        push(@qform, t => 1) if $opt->{thread};
-       my @cmd = (qw(curl -sSf -d), '');
+       my @cmd = ($self->{curl}, qw(-sSf -d), '');
        my $verbose = $opt->{verbose};
        my $reap;
        my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
@@ -224,14 +233,22 @@ sub query_remote_mboxrd {
        my $tor = $opt->{torsocks} //= 'auto';
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        for my $uri (@$uris) {
+               $lei->{-current_url} = $uri->as_string;
+               $lei->{-nr_remote_eml} = 0;
                $uri->query_form(@qform);
                my $cmd = [ @cmd, $uri->as_string ];
                if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
                                (($env->{LD_PRELOAD}//'') !~ /torsocks/)) {
-                       unshift @$cmd, 'torsocks';
+                       unshift @$cmd, which('torsocks');
                } elsif (PublicInbox::Config::git_bool($tor)) {
-                       unshift @$cmd, 'torsocks';
+                       unshift @$cmd, which('torsocks');
                }
+
+               # continue anyways if torsocks is missing; a proxy may be
+               # specified via CLI, curlrc, environment variable, or even
+               # firewall rule
+               shift(@$cmd) if !$cmd->[0];
+
                $lei->err("# @$cmd") if $verbose;
                $? = 0;
                my $fh = popen_rd($cmd, $env, $rdr);
@@ -241,7 +258,12 @@ sub query_remote_mboxrd {
                                                        $lei, $each_smsg);
                };
                return $lei->fail("E: @$cmd: $@") if $@;
-               next unless $?;
+               if ($? == 0) {
+                       my $nr = $lei->{-nr_remote_eml};
+                       pkt_do($lei->{pkt_op}, 'mset_progress',
+                               $lei->{-current_url}, $nr, $nr);
+                       next;
+               }
                seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
                my $e = do { local $/; <$cerr> } //
                                die "read(curl stderr): $!\n";
@@ -280,7 +302,7 @@ sub query_done { # EOF callback
        my $has_l2m = exists $lei->{l2m};
        for my $f (qw(lxs l2m)) {
                my $wq = delete $lei->{$f} or next;
-               $wq->wq_wait_old;
+               $wq->wq_wait_old($lei);
        }
        $lei->{ovv}->ovv_end($lei);
        if ($has_l2m) { # close() calls LeiToMail reap_compress
@@ -294,9 +316,19 @@ Error closing $lei->{ovv}->{dst}: $!
                }
                $lei->start_mua;
        }
+       $lei->{opt}->{quiet} or
+               $lei->err('# ', $lei->{-mset_total} // 0, " matches");
        $lei->dclose;
 }
 
+sub mset_progress { # called via pkt_op/pkt_do from workers
+       my ($lei, $pargs) = @_;
+       my ($desc, $mset_size, $mset_total_est) = @$pargs;
+       return if $lei->{opt}->{quiet};
+       $lei->{-mset_total} += $mset_size;
+       $lei->err("# $desc $mset_size/$mset_total_est");
+}
+
 sub do_post_augment {
        my ($lei, $zpipe, $au_done) = @_;
        my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
@@ -345,79 +377,80 @@ sub start_query { # always runs in main (lei-daemon) process
 sub query_prepare { # called by wq_do
        my ($self, $lei) = @_;
        local $0 = "$0 query_prepare";
-       my %sig = $lei->atfork_child_wq($self);
-       -p $lei->{0} or die "BUG: \$done pipe expected";
-       local @SIG{keys %sig} = values %sig;
+       $lei->atfork_child_wq($self);
+       delete $lei->{l2m}->{-wq_s1};
        eval { $lei->{l2m}->do_augment($lei) };
        $lei->fail($@) if $@;
-       syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!";
+       pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
-       my ($lei) = @_;
-       my $lxs = delete $lei->{lxs};
-       if ($lxs && $lxs->wq_kill_old) {
-               kill 'PIPE', $$;
-               $lxs->wq_wait_old;
+sub fail_handler ($;$$) {
+       my ($lei, $code, $io) = @_;
+       if (my $lxs = delete $lei->{lxs}) {
+               $lxs->wq_wait_old($lei) if $lxs->wq_kill_old; # lei-daemon
        }
-       close(delete $lei->{1}) if $lei->{1};
+       close($io) if $io; # needed to avoid warnings on SIGPIPE
+       $lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+       fail_handler($_[0], 13, delete $_[0]->{1});
 }
 
 sub do_query {
-       my ($self, $lei_orig) = @_;
-       my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
-       $io[0] = undef;
-       pipe(my $done, $io[0]) or die "pipe $!";
-       $lei_orig->{1}->autoflush(1);
-
-       $lei_orig->event_step_init; # wait for shutdowns
-       my $done_op = {
-               '' => [ \&query_done, $lei_orig ],
-               '!' => [ \&sigpipe_handler, $lei_orig ]
-       };
-       my $in_loop = exists $lei_orig->{sock};
-       $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
+       my ($self, $lei) = @_;
+       $lei->{1}->autoflush(1);
+       my ($au_done, $zpipe);
        my $l2m = $lei->{l2m};
        if ($l2m) {
-               # may redirect $lei->{1} for mbox
-               my $zpipe = $l2m->pre_augment($lei_orig);
-               $io[1] = $lei_orig->{1};
-               pipe(my ($startq, $au_done)) or die "pipe: $!";
-               $done_op->{'.'} = [ \&do_post_augment, $lei_orig,
-                                       $zpipe, $au_done ];
-               local $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1}
-               die "BUG: unexpected \$io[5]: $io[5]" if $io[5];
-               $self->wq_do('query_prepare', \@io, $lei);
-               fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ
-               $io[5] = $startq;
+               pipe($lei->{startq}, $au_done) or die "pipe: $!";
+               # 1031: F_SETPIPE_SZ
+               fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+               $zpipe = $l2m->pre_augment($lei);
+       }
+       my $in_loop = exists $lei->{sock};
+       my $ops = {
+               '|' => [ \&sigpipe_handler, $lei ],
+               '!' => [ \&fail_handler, $lei ],
+               '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
+               '' => [ \&query_done, $lei ],
+               'mset_progress' => [ \&mset_progress, $lei ],
+       };
+       (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop);
+       my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
+       delete($lei->{pkt_op});
+
+       $lei->event_step_init; # wait for shutdowns
+       if ($l2m) {
+               $self->wq_do('query_prepare', \@io, $lei_ipc);
                $io[1] = $zpipe->[1] if $zpipe;
        }
-       start_query($self, \@io, $lei);
+       start_query($self, \@io, $lei_ipc);
        $self->wq_close(1);
        unless ($in_loop) {
-               # for the $lei->atfork_child_wq PIPE handler:
-               while ($done->{sock}) { $done->event_step }
+               # for the $lei_ipc->atfork_child_wq PIPE handler:
+               while ($op->{sock}) { $op->event_step }
        }
 }
 
-sub ipc_atfork_prepare {
-       my ($self) = @_;
-       if (exists $self->{remotes}) {
+sub add_uri {
+       my ($self, $uri) = @_;
+       if (my $curl = $self->{curl} //= which('curl') // 0) {
                require PublicInbox::MboxReader;
                require IO::Uncompress::Gunzip;
+               push @{$self->{remotes}}, $uri;
+       } else {
+               warn "curl missing, ignoring $uri\n";
        }
-       # FDS: (0: done_wr, 1: stdout|mbox, 2: stderr,
-       #       3: sock, 4: $l2m->{-wq_s1}, 5: $startq)
-       $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 
 sub prepare_external {
        my ($self, $loc, $boost) = @_; # n.b. already ordered by boost
        if (ref $loc) { # already a URI, or PublicInbox::Inbox-like object
-               return push(@{$self->{remotes}}, $loc) if $loc->can('scheme');
+               return add_uri($self, $loc) if $loc->can('scheme');
        } elsif ($loc =~ m!\Ahttps?://!) {
                require URI;
-               return push(@{$self->{remotes}}, URI->new($loc));
+               return add_uri($self, URI->new($loc));
        } elsif (-f "$loc/ei.lock") {
                require PublicInbox::ExtSearch;
                $loc = PublicInbox::ExtSearch->new($loc);