]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei q: emit progress and counting via PktOp
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index f630e79a9b1cb48687e5e1c8f2fbb3d3e615180b..95862306cd7a98beba3aca94659f17bd7bf3460a 100644 (file)
@@ -8,8 +8,8 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::DS qw(dwaitpid);
-use PublicInbox::OpPipe;
+use PublicInbox::DS qw(dwaitpid now);
+use PublicInbox::PktOp qw(pkt_do);
 use PublicInbox::Import;
 use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
@@ -96,7 +96,7 @@ sub over {}
 sub _mset_more ($$) {
        my ($mset, $mo) = @_;
        my $size = $mset->size;
-       $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+       $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
 }
 
 # $startq will EOF when query_prepare is done augmenting and allow
@@ -114,16 +114,15 @@ sub query_thread_mset { # for --thread
        my $startq = delete $lei->{startq};
 
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
-       unless ($srch && $over) {
-               my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
-               warn "$desc not indexed by Xapian\n";
-               return;
-       }
+       my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+       return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
        do {
                $mset = $srch->mset($mo->{qstr}, $mo);
+               pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size,
+                               $mset->get_matches_estimated);
                my $ids = $srch->mset_to_artnums($mset, $mo);
                my $ctx = { ids => $ids };
                my $i = 0;
@@ -155,6 +154,8 @@ sub query_mset { # non-parallel for non-"--thread" users
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
        do {
                $mset = $self->mset($mo->{qstr}, $mo);
+               pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch',
+                               $mset->size, $mset->get_matches_estimated);
                for my $mitem ($mset->items) {
                        my $smsg = smsg_for($self, $mitem) or next;
                        wait_startq($startq) if $startq;
@@ -173,6 +174,16 @@ sub each_eml { # callback for MboxReader->mboxrd
        $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
        delete @$smsg{qw(From Subject -ds -ts)};
        if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+       ++$lei->{-nr_remote_eml};
+       if (!$lei->{opt}->{quiet}) {
+               my $now = now();
+               my $next = $lei->{-next_progress} //= ($now + 1);
+               if ($now > $next) {
+                       $lei->{-next_progress} = $now + 1;
+                       my $nr = $lei->{-nr_remote_eml};
+                       $lei->err("# $lei->{-current_url} $nr/?");
+               }
+       }
        $each_smsg->($smsg, undef, $eml);
 }
 
@@ -222,6 +233,8 @@ sub query_remote_mboxrd {
        my $tor = $opt->{torsocks} //= 'auto';
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        for my $uri (@$uris) {
+               $lei->{-current_url} = $uri->as_string;
+               $lei->{-nr_remote_eml} = 0;
                $uri->query_form(@qform);
                my $cmd = [ @cmd, $uri->as_string ];
                if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
@@ -245,7 +258,12 @@ sub query_remote_mboxrd {
                                                        $lei, $each_smsg);
                };
                return $lei->fail("E: @$cmd: $@") if $@;
-               next unless $?;
+               if ($? == 0) {
+                       my $nr = $lei->{-nr_remote_eml};
+                       pkt_do($lei->{pkt_op}, 'mset_progress',
+                               $lei->{-current_url}, $nr, $nr);
+                       next;
+               }
                seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
                my $e = do { local $/; <$cerr> } //
                                die "read(curl stderr): $!\n";
@@ -298,9 +316,19 @@ Error closing $lei->{ovv}->{dst}: $!
                }
                $lei->start_mua;
        }
+       $lei->{opt}->{quiet} or
+               $lei->err('# ', $lei->{-mset_total} // 0, " matches");
        $lei->dclose;
 }
 
+sub mset_progress { # called via pkt_op/pkt_do from workers
+       my ($lei, $pargs) = @_;
+       my ($desc, $mset_size, $mset_total_est) = @$pargs;
+       return if $lei->{opt}->{quiet};
+       $lei->{-mset_total} += $mset_size;
+       $lei->err("# $desc $mset_size/$mset_total_est");
+}
+
 sub do_post_augment {
        my ($lei, $zpipe, $au_done) = @_;
        my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
@@ -353,7 +381,7 @@ sub query_prepare { # called by wq_do
        delete $lei->{l2m}->{-wq_s1};
        eval { $lei->{l2m}->do_augment($lei) };
        $lei->fail($@) if $@;
-       syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!"
+       pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
 sub fail_handler ($;$$) {
@@ -380,20 +408,20 @@ sub do_query {
                fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
                $zpipe = $l2m->pre_augment($lei);
        }
-       pipe(my $done, $lei->{op_pipe}) or die "pipe $!";
+       my $in_loop = exists $lei->{sock};
+       my $ops = {
+               '|' => [ \&sigpipe_handler, $lei ],
+               '!' => [ \&fail_handler, $lei ],
+               '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
+               '' => [ \&query_done, $lei ],
+               'mset_progress' => [ \&mset_progress, $lei ],
+       };
+       (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop);
        my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
-       delete($lei->{op_pipe});
+       delete($lei->{pkt_op});
 
        $lei->event_step_init; # wait for shutdowns
-       my $done_op = {
-               '' => [ \&query_done, $lei ],
-               '|' => [ \&sigpipe_handler, $lei ],
-               '!' => [ \&fail_handler, $lei ]
-       };
-       my $in_loop = exists $lei->{sock};
-       $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
        if ($l2m) {
-               $done_op->{'.'} = [ \&do_post_augment, $lei, $zpipe, $au_done ];
                $self->wq_do('query_prepare', \@io, $lei_ipc);
                $io[1] = $zpipe->[1] if $zpipe;
        }
@@ -401,7 +429,7 @@ sub do_query {
        $self->wq_close(1);
        unless ($in_loop) {
                # for the $lei_ipc->atfork_child_wq PIPE handler:
-               while ($done->{sock}) { $done->event_step }
+               while ($op->{sock}) { $op->event_step }
        }
 }