use strict;
use v5.10.1;
use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::DS qw(dwaitpid);
-use PublicInbox::PktOp;
+use PublicInbox::DS qw(dwaitpid now);
+use PublicInbox::PktOp qw(pkt_do);
use PublicInbox::Import;
use File::Temp 0.19 (); # 0.19 for ->newdir
use File::Spec ();
-use Socket qw(MSG_EOR);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::Spawn qw(popen_rd spawn which);
use PublicInbox::MID qw(mids);
sub _mset_more ($$) {
my ($mset, $mo) = @_;
my $size = $mset->size;
- $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+ $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit});
}
# $startq will EOF when query_prepare is done augmenting and allow
my $startq = delete $lei->{startq};
my ($srch, $over) = ($ibxish->search, $ibxish->over);
- unless ($srch && $over) {
- my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
- warn "$desc not indexed by Xapian\n";
- return;
- }
+ my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+ return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
my $mo = { %{$lei->{mset_opt}} };
my $mset;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
do {
$mset = $srch->mset($mo->{qstr}, $mo);
+ pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size,
+ $mset->get_matches_estimated);
my $ids = $srch->mset_to_artnums($mset, $mo);
my $ctx = { ids => $ids };
my $i = 0;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
do {
$mset = $self->mset($mo->{qstr}, $mo);
+ pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch',
+ $mset->size, $mset->get_matches_estimated);
for my $mitem ($mset->items) {
my $smsg = smsg_for($self, $mitem) or next;
wait_startq($startq) if $startq;
$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
delete @$smsg{qw(From Subject -ds -ts)};
if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+ ++$lei->{-nr_remote_eml};
+ if (!$lei->{opt}->{quiet}) {
+ my $now = now();
+ my $next = $lei->{-next_progress} //= ($now + 1);
+ if ($now > $next) {
+ $lei->{-next_progress} = $now + 1;
+ my $nr = $lei->{-nr_remote_eml};
+ $lei->err("# $lei->{-current_url} $nr/?");
+ }
+ }
$each_smsg->($smsg, undef, $eml);
}
my $tor = $opt->{torsocks} //= 'auto';
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
for my $uri (@$uris) {
+ $lei->{-current_url} = $uri->as_string;
+ $lei->{-nr_remote_eml} = 0;
$uri->query_form(@qform);
my $cmd = [ @cmd, $uri->as_string ];
if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' &&
$lei, $each_smsg);
};
return $lei->fail("E: @$cmd: $@") if $@;
- next unless $?;
+ if ($? == 0) {
+ my $nr = $lei->{-nr_remote_eml};
+ pkt_do($lei->{pkt_op}, 'mset_progress',
+ $lei->{-current_url}, $nr, $nr);
+ next;
+ }
seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
my $e = do { local $/; <$cerr> } //
die "read(curl stderr): $!\n";
$coff += length($e);
+ truncate($cerr, 0);
next if (($? >> 8) == 22 && $e =~ /\b404\b/);
$lei->child_error($?);
$uri->query_form(q => $lei->{mset_opt}->{qstr});
}
$lei->start_mua;
}
+ $lei->{opt}->{quiet} or
+ $lei->err('# ', $lei->{-mset_total} // 0, " matches");
$lei->dclose;
}
+sub mset_progress { # called via pkt_op/pkt_do from workers
+ my ($lei, $pargs) = @_;
+ my ($desc, $mset_size, $mset_total_est) = @$pargs;
+ return if $lei->{opt}->{quiet};
+ $lei->{-mset_total} += $mset_size;
+ $lei->err("# $desc $mset_size/$mset_total_est");
+}
+
sub do_post_augment {
my ($lei, $zpipe, $au_done) = @_;
my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
delete $lei->{l2m}->{-wq_s1};
eval { $lei->{l2m}->do_augment($lei) };
$lei->fail($@) if $@;
- send($lei->{pkt_op}, '.', MSG_EOR) == 1 or
- die "do_post_augment trigger: $!"
+ pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!"
}
sub fail_handler ($;$$) {
fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
$zpipe = $l2m->pre_augment($lei);
}
- my $in_loop = exists $lei->{sock};
my $ops = {
'|' => [ \&sigpipe_handler, $lei ],
'!' => [ \&fail_handler, $lei ],
'.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
'' => [ \&query_done, $lei ],
+ 'mset_progress' => [ \&mset_progress, $lei ],
};
- (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop);
+ (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, !$lei->{oneshot});
my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
delete($lei->{pkt_op});
}
start_query($self, \@io, $lei_ipc);
$self->wq_close(1);
- unless ($in_loop) {
+ if ($lei->{oneshot}) {
# for the $lei_ipc->atfork_child_wq PIPE handler:
while ($op->{sock}) { $op->event_step }
}