]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei q: emit progress and counting via PktOp
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index e577ab09a3508bf9eab3aa7396eca2161cc6a922..95862306cd7a98beba3aca94659f17bd7bf3460a 100644 (file)
@@ -8,12 +8,11 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::DS qw(dwaitpid);
-use PublicInbox::PktOp;
+use PublicInbox::DS qw(dwaitpid now);
+use PublicInbox::PktOp qw(pkt_do);
 use PublicInbox::Import;
 use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
-use Socket qw(MSG_EOR);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::Spawn qw(popen_rd spawn which);
 use PublicInbox::MID qw(mids);
@@ -97,7 +96,7 @@ sub over {}
 sub _mset_more ($$) {
        my ($mset, $mo) = @_;
        my $size = $mset->size;
-       $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+       $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
 }
 
 # $startq will EOF when query_prepare is done augmenting and allow
@@ -115,16 +114,15 @@ sub query_thread_mset { # for --thread
        my $startq = delete $lei->{startq};
 
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
-       unless ($srch && $over) {
-               my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
-               warn "$desc not indexed by Xapian\n";
-               return;
-       }
+       my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+       return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
        do {
                $mset = $srch->mset($mo->{qstr}, $mo);
+               pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size,
+                               $mset->get_matches_estimated);
                my $ids = $srch->mset_to_artnums($mset, $mo);
                my $ctx = { ids => $ids };
                my $i = 0;
@@ -156,6 +154,8 @@ sub query_mset { # non-parallel for non-"--thread" users
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
        do {
                $mset = $self->mset($mo->{qstr}, $mo);
+               pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch',
+                               $mset->size, $mset->get_matches_estimated);
                for my $mitem ($mset->items) {
                        my $smsg = smsg_for($self, $mitem) or next;
                        wait_startq($startq) if $startq;
@@ -174,6 +174,16 @@ sub each_eml { # callback for MboxReader->mboxrd
        $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
        delete @$smsg{qw(From Subject -ds -ts)};
        if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+       ++$lei->{-nr_remote_eml};
+       if (!$lei->{opt}->{quiet}) {
+               my $now = now();
+               my $next = $lei->{-next_progress} //= ($now + 1);
+               if ($now > $next) {
+                       $lei->{-next_progress} = $now + 1;
+                       my $nr = $lei->{-nr_remote_eml};
+                       $lei->err("# $lei->{-current_url} $nr/?");
+               }
+       }
        $each_smsg->($smsg, undef, $eml);
 }
 
@@ -223,6 +233,8 @@ sub query_remote_mboxrd {
        my $tor = $opt->{torsocks} //= 'auto';
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        for my $uri (@$uris) {
+               $lei->{-current_url} = $uri->as_string;
+               $lei->{-nr_remote_eml} = 0;
                $uri->query_form(@qform);
                my $cmd = [ @cmd, $uri->as_string ];
                if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
@@ -246,7 +258,12 @@ sub query_remote_mboxrd {
                                                        $lei, $each_smsg);
                };
                return $lei->fail("E: @$cmd: $@") if $@;
-               next unless $?;
+               if ($? == 0) {
+                       my $nr = $lei->{-nr_remote_eml};
+                       pkt_do($lei->{pkt_op}, 'mset_progress',
+                               $lei->{-current_url}, $nr, $nr);
+                       next;
+               }
                seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
                my $e = do { local $/; <$cerr> } //
                                die "read(curl stderr): $!\n";
@@ -299,9 +316,19 @@ Error closing $lei->{ovv}->{dst}: $!
                }
                $lei->start_mua;
        }
+       $lei->{opt}->{quiet} or
+               $lei->err('# ', $lei->{-mset_total} // 0, " matches");
        $lei->dclose;
 }
 
+sub mset_progress { # called via pkt_op/pkt_do from workers
+       my ($lei, $pargs) = @_;
+       my ($desc, $mset_size, $mset_total_est) = @$pargs;
+       return if $lei->{opt}->{quiet};
+       $lei->{-mset_total} += $mset_size;
+       $lei->err("# $desc $mset_size/$mset_total_est");
+}
+
 sub do_post_augment {
        my ($lei, $zpipe, $au_done) = @_;
        my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
@@ -354,8 +381,7 @@ sub query_prepare { # called by wq_do
        delete $lei->{l2m}->{-wq_s1};
        eval { $lei->{l2m}->do_augment($lei) };
        $lei->fail($@) if $@;
-       send($lei->{pkt_op}, '.', MSG_EOR) == 1 or
-               die "do_post_augment trigger: $!"
+       pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
 sub fail_handler ($;$$) {
@@ -388,6 +414,7 @@ sub do_query {
                '!' => [ \&fail_handler, $lei ],
                '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
                '' => [ \&query_done, $lei ],
+               'mset_progress' => [ \&mset_progress, $lei ],
        };
        (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop);
        my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);