X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiXSearch.pm;h=95862306cd7a98beba3aca94659f17bd7bf3460a;hb=7349713101700e488231ad9ffece8ee42de0928c;hp=e577ab09a3508bf9eab3aa7396eca2161cc6a922;hpb=a9ba3d10482daea78739f3da6c8d7739ac8af3fc;p=public-inbox.git diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index e577ab09..95862306 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -8,12 +8,11 @@ package PublicInbox::LeiXSearch; use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); -use PublicInbox::DS qw(dwaitpid); -use PublicInbox::PktOp; +use PublicInbox::DS qw(dwaitpid now); +use PublicInbox::PktOp qw(pkt_do); use PublicInbox::Import; use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); -use Socket qw(MSG_EOR); use PublicInbox::Search qw(xap_terms); use PublicInbox::Spawn qw(popen_rd spawn which); use PublicInbox::MID qw(mids); @@ -97,7 +96,7 @@ sub over {} sub _mset_more ($$) { my ($mset, $mo) = @_; my $size = $mset->size; - $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000)); + $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit}); } # $startq will EOF when query_prepare is done augmenting and allow @@ -115,16 +114,15 @@ sub query_thread_mset { # for --thread my $startq = delete $lei->{startq}; my ($srch, $over) = ($ibxish->search, $ibxish->over); - unless ($srch && $over) { - my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; - warn "$desc not indexed by Xapian\n"; - return; - } + my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; + return warn("$desc not indexed by Xapian\n") unless ($srch && $over); my $mo = { %{$lei->{mset_opt}} }; my $mset; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish); do { $mset = $srch->mset($mo->{qstr}, $mo); + pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size, + $mset->get_matches_estimated); my $ids = $srch->mset_to_artnums($mset, $mo); my $ctx = { ids => $ids }; my $i = 0; @@ -156,6 +154,8 @@ sub query_mset { # non-parallel for non-"--thread" users my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self); do { $mset = $self->mset($mo->{qstr}, $mo); + pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch', + $mset->size, $mset->get_matches_estimated); for my $mitem ($mset->items) { my $smsg = smsg_for($self, $mitem) or next; wait_startq($startq) if $startq; @@ -174,6 +174,16 @@ sub each_eml { # callback for MboxReader->mboxrd $smsg->{$_} //= '' for qw(from to cc ds subject references mid); delete @$smsg{qw(From Subject -ds -ts)}; if (my $startq = delete($lei->{startq})) { wait_startq($startq) } + ++$lei->{-nr_remote_eml}; + if (!$lei->{opt}->{quiet}) { + my $now = now(); + my $next = $lei->{-next_progress} //= ($now + 1); + if ($now > $next) { + $lei->{-next_progress} = $now + 1; + my $nr = $lei->{-nr_remote_eml}; + $lei->err("# $lei->{-current_url} $nr/?"); + } + } $each_smsg->($smsg, undef, $eml); } @@ -223,6 +233,8 @@ sub query_remote_mboxrd { my $tor = $opt->{torsocks} //= 'auto'; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); for my $uri (@$uris) { + $lei->{-current_url} = $uri->as_string; + $lei->{-nr_remote_eml} = 0; $uri->query_form(@qform); my $cmd = [ @cmd, $uri->as_string ]; if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' && @@ -246,7 +258,12 @@ sub query_remote_mboxrd { $lei, $each_smsg); }; return $lei->fail("E: @$cmd: $@") if $@; - next unless $?; + if ($? == 0) { + my $nr = $lei->{-nr_remote_eml}; + pkt_do($lei->{pkt_op}, 'mset_progress', + $lei->{-current_url}, $nr, $nr); + next; + } seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n"; my $e = do { local $/; <$cerr> } // die "read(curl stderr): $!\n"; @@ -299,9 +316,19 @@ Error closing $lei->{ovv}->{dst}: $! } $lei->start_mua; } + $lei->{opt}->{quiet} or + $lei->err('# ', $lei->{-mset_total} // 0, " matches"); $lei->dclose; } +sub mset_progress { # called via pkt_op/pkt_do from workers + my ($lei, $pargs) = @_; + my ($desc, $mset_size, $mset_total_est) = @$pargs; + return if $lei->{opt}->{quiet}; + $lei->{-mset_total} += $mset_size; + $lei->err("# $desc $mset_size/$mset_total_est"); +} + sub do_post_augment { my ($lei, $zpipe, $au_done) = @_; my $l2m = $lei->{l2m} or die 'BUG: no {l2m}'; @@ -354,8 +381,7 @@ sub query_prepare { # called by wq_do delete $lei->{l2m}->{-wq_s1}; eval { $lei->{l2m}->do_augment($lei) }; $lei->fail($@) if $@; - send($lei->{pkt_op}, '.', MSG_EOR) == 1 or - die "do_post_augment trigger: $!" + pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!" } sub fail_handler ($;$$) { @@ -388,6 +414,7 @@ sub do_query { '!' => [ \&fail_handler, $lei ], '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ], '' => [ \&query_done, $lei ], + 'mset_progress' => [ \&mset_progress, $lei ], }; (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop); my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);