read($startq, my $query_prepare_done, 1);
}
+sub mset_progress {
+ my $lei = shift;
+ return unless $lei->{-progress};
+ if ($lei->{pkt_op}) { # called via pkt_op/pkt_do from workers
+ pkt_do($lei->{pkt_op}, 'mset_progress', @_);
+ } else { # single lei-daemon consumer
+ my ($desc, $mset_size, $mset_total_est) = @_;
+ $lei->{-mset_total} += $mset_size;
+ $lei->err("# $desc $mset_size/$mset_total_est");
+ }
+}
+
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
local $0 = "$0 query_thread_mset";
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
do {
$mset = $srch->mset($mo->{qstr}, $mo);
- pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size,
+ mset_progress($lei, $desc, $mset->size,
$mset->get_matches_estimated);
my $ids = $srch->mset_to_artnums($mset, $mo);
my $ctx = { ids => $ids };
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
do {
$mset = $self->mset($mo->{qstr}, $mo);
- pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch',
+ mset_progress($lei, 'xsearch', $mset->size,
$mset->size, $mset->get_matches_estimated);
for my $mitem ($mset->items) {
my $smsg = smsg_for($self, $mitem) or next;
$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
delete @$smsg{qw(From Subject -ds -ts)};
if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
- ++$lei->{-nr_remote_eml};
- if (!$lei->{opt}->{quiet}) {
+ if ($lei->{-progress}) {
+ ++$lei->{-nr_remote_eml};
my $now = now();
my $next = $lei->{-next_progress} //= ($now + 1);
if ($now > $next) {
my ($self, $lei, $uris) = @_;
local $0 = "$0 query_remote_mboxrd";
$lei->atfork_child_wq($self);
+ local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
my ($opt, $env) = @$lei{qw(opt env)};
my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
push(@qform, t => 1) if $opt->{thread};
shift(@$cmd) if !$cmd->[0];
$lei->err("# @$cmd") if $verbose;
- $? = 0;
- my $fh = popen_rd($cmd, $env, $rdr);
+ my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
$fh = IO::Uncompress::Gunzip->new($fh);
- eval {
- PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
- $lei, $each_smsg);
- };
- return $lei->fail("E: @$cmd: $@") if $@;
+ PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
+ $lei, $each_smsg);
+ waitpid($pid, 0) == $pid or die "BUG: waitpid (curl): $!";
if ($? == 0) {
my $nr = $lei->{-nr_remote_eml};
- pkt_do($lei->{pkt_op}, 'mset_progress',
- $lei->{-current_url}, $nr, $nr);
+ mset_progress($lei, $lei->{-current_url}, $nr, $nr);
next;
}
seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
}
$lei->start_mua;
}
- $lei->{opt}->{quiet} or
+ $lei->{-progress} and
$lei->err('# ', $lei->{-mset_total} // 0, " matches");
$lei->dclose;
}
-sub mset_progress { # called via pkt_op/pkt_do from workers
- my ($lei, $pargs) = @_;
- my ($desc, $mset_size, $mset_total_est) = @$pargs;
- return if $lei->{opt}->{quiet};
- $lei->{-mset_total} += $mset_size;
- $lei->err("# $desc $mset_size/$mset_total_est");
-}
-
sub do_post_augment {
my ($lei, $zpipe, $au_done) = @_;
my $l2m = $lei->{l2m} or die 'BUG: no {l2m}';
}
my $MAX_PER_HOST = 4;
-sub MAX_PER_HOST { $MAX_PER_HOST }
sub concurrency {
my ($self, $opt) = @_;
sub fail_handler ($;$$) {
my ($lei, $code, $io) = @_;
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_wait_old($lei) if $lxs->wq_kill_old; # lei-daemon
+ for my $f (qw(lxs l2m)) {
+ my $wq = delete $lei->{$f} or next;
+ $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon
}
close($io) if $io; # needed to avoid warnings on SIGPIPE
$lei->x_it($code // (1 >> 8));
'.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
'' => [ \&query_done, $lei ],
'mset_progress' => [ \&mset_progress, $lei ],
+ 'x_it' => [ $lei->can('x_it'), $lei ],
+ 'child_error' => [ $lei->can('child_error'), $lei ],
};
- (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, !$lei->{oneshot});
+ (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops);
my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
delete($lei->{pkt_op});