]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LeiXSearch.pm
lei q: emit progress and counting via PktOp
[public-inbox.git] / lib / PublicInbox / LeiXSearch.pm
index de82a7daee1d4b5a20227e3f4819bc69d330cacb..95862306cd7a98beba3aca94659f17bd7bf3460a 100644 (file)
@@ -8,14 +8,15 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::DS qw(dwaitpid);
-use PublicInbox::OpPipe;
+use PublicInbox::DS qw(dwaitpid now);
+use PublicInbox::PktOp qw(pkt_do);
 use PublicInbox::Import;
 use File::Temp 0.19 (); # 0.19 for ->newdir
 use File::Spec ();
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::Spawn qw(popen_rd spawn which);
 use PublicInbox::MID qw(mids);
+use PublicInbox::Smsg;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
 
 sub new {
@@ -95,7 +96,7 @@ sub over {}
 sub _mset_more ($$) {
        my ($mset, $mo) = @_;
        my $size = $mset->size;
-       $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+       $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
 }
 
 # $startq will EOF when query_prepare is done augmenting and allow
@@ -113,16 +114,15 @@ sub query_thread_mset { # for --thread
        my $startq = delete $lei->{startq};
 
        my ($srch, $over) = ($ibxish->search, $ibxish->over);
-       unless ($srch && $over) {
-               my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
-               warn "$desc not indexed by Xapian\n";
-               return;
-       }
+       my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+       return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
        my $mo = { %{$lei->{mset_opt}} };
        my $mset;
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
        do {
                $mset = $srch->mset($mo->{qstr}, $mo);
+               pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size,
+                               $mset->get_matches_estimated);
                my $ids = $srch->mset_to_artnums($mset, $mo);
                my $ctx = { ids => $ids };
                my $i = 0;
@@ -154,6 +154,8 @@ sub query_mset { # non-parallel for non-"--thread" users
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
        do {
                $mset = $self->mset($mo->{qstr}, $mo);
+               pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch',
+                               $mset->size, $mset->get_matches_estimated);
                for my $mitem ($mset->items) {
                        my $smsg = smsg_for($self, $mitem) or next;
                        wait_startq($startq) if $startq;
@@ -172,6 +174,16 @@ sub each_eml { # callback for MboxReader->mboxrd
        $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
        delete @$smsg{qw(From Subject -ds -ts)};
        if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+       ++$lei->{-nr_remote_eml};
+       if (!$lei->{opt}->{quiet}) {
+               my $now = now();
+               my $next = $lei->{-next_progress} //= ($now + 1);
+               if ($now > $next) {
+                       $lei->{-next_progress} = $now + 1;
+                       my $nr = $lei->{-nr_remote_eml};
+                       $lei->err("# $lei->{-current_url} $nr/?");
+               }
+       }
        $each_smsg->($smsg, undef, $eml);
 }
 
@@ -221,6 +233,8 @@ sub query_remote_mboxrd {
        my $tor = $opt->{torsocks} //= 'auto';
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
        for my $uri (@$uris) {
+               $lei->{-current_url} = $uri->as_string;
+               $lei->{-nr_remote_eml} = 0;
                $uri->query_form(@qform);
                my $cmd = [ @cmd, $uri->as_string ];
                if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
@@ -244,7 +258,12 @@ sub query_remote_mboxrd {
                                                        $lei, $each_smsg);
                };
                return $lei->fail("E: @$cmd: $@") if $@;
-               next unless $?;
+               if ($? == 0) {
+                       my $nr = $lei->{-nr_remote_eml};
+                       pkt_do($lei->{pkt_op}, 'mset_progress',
+                               $lei->{-current_url}, $nr, $nr);
+                       next;
+               }
                seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
                my $e = do { local $/; <$cerr> } //
                                die "read(curl stderr): $!\n";
@@ -283,7 +302,7 @@ sub query_done { # EOF callback
        my $has_l2m = exists $lei->{l2m};
        for my $f (qw(lxs l2m)) {
                my $wq = delete $lei->{$f} or next;
-               $wq->wq_wait_old;
+               $wq->wq_wait_old($lei);
        }
        $lei->{ovv}->ovv_end($lei);
        if ($has_l2m) { # close() calls LeiToMail reap_compress
@@ -297,9 +316,19 @@ Error closing $lei->{ovv}->{dst}: $!
                }
                $lei->start_mua;
        }
+       $lei->{opt}->{quiet} or
+               $lei->err('# ', $lei->{-mset_total} // 0, " matches");
        $lei->dclose;
 }
 
+sub mset_progress { # called via pkt_op/pkt_do from workers
+       my ($lei, $pargs) = @_;
+       my ($desc, $mset_size, $mset_total_est) = @$pargs;
+       return if $lei->{opt}->{quiet};
+       $lei->{-mset_total} += $mset_size;
+       $lei->err("# $desc $mset_size/$mset_total_est");
+}
+
 sub do_post_augment {
        my ($lei, $zpipe, $au_done) = @_;
        my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
@@ -352,17 +381,20 @@ sub query_prepare { # called by wq_do
        delete $lei->{l2m}->{-wq_s1};
        eval { $lei->{l2m}->do_augment($lei) };
        $lei->fail($@) if $@;
-       syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!"
+       pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
-       my ($lei) = @_;
-       my $lxs = delete $lei->{lxs};
-       if ($lxs && $lxs->wq_kill_old) { # is this the daemon?
-               $lxs->wq_wait_old;
+sub fail_handler ($;$$) {
+       my ($lei, $code, $io) = @_;
+       if (my $lxs = delete $lei->{lxs}) {
+               $lxs->wq_wait_old($lei) if $lxs->wq_kill_old; # lei-daemon
        }
-       close(delete $lei->{1}) if $lei->{1};
-       $lei->x_it(13);
+       close($io) if $io; # needed to avoid warnings on SIGPIPE
+       $lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+       fail_handler($_[0], 13, delete $_[0]->{1});
 }
 
 sub do_query {
@@ -376,19 +408,20 @@ sub do_query {
                fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
                $zpipe = $l2m->pre_augment($lei);
        }
-       pipe(my $done, $lei->{op_pipe}) or die "pipe $!";
+       my $in_loop = exists $lei->{sock};
+       my $ops = {
+               '|' => [ \&sigpipe_handler, $lei ],
+               '!' => [ \&fail_handler, $lei ],
+               '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
+               '' => [ \&query_done, $lei ],
+               'mset_progress' => [ \&mset_progress, $lei ],
+       };
+       (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop);
        my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
-       delete($lei->{op_pipe});
+       delete($lei->{pkt_op});
 
        $lei->event_step_init; # wait for shutdowns
-       my $done_op = {
-               '' => [ \&query_done, $lei ],
-               '!' => [ \&sigpipe_handler, $lei ]
-       };
-       my $in_loop = exists $lei->{sock};
-       $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);
        if ($l2m) {
-               $done_op->{'.'} = [ \&do_post_augment, $lei, $zpipe, $au_done ];
                $self->wq_do('query_prepare', \@io, $lei_ipc);
                $io[1] = $zpipe->[1] if $zpipe;
        }
@@ -396,7 +429,7 @@ sub do_query {
        $self->wq_close(1);
        unless ($in_loop) {
                # for the $lei_ipc->atfork_child_wq PIPE handler:
-               while ($done->{sock}) { $done->event_step }
+               while ($op->{sock}) { $op->event_step }
        }
 }