if ($lei->{pkt_op}) { # called via pkt_op/pkt_do from workers
pkt_do($lei->{pkt_op}, 'mset_progress', @_);
} else { # single lei-daemon consumer
- my @args = ref($_[-1]) eq 'ARRAY' ? @{$_[-1]} : @_;
- my ($desc, $mset_size, $mset_total_est) = @args;
+ my ($desc, $mset_size, $mset_total_est) = @_;
$lei->{-mset_total} += $mset_size;
$lei->err("# $desc $mset_size/$mset_total_est");
}
shift(@$cmd) if !$cmd->[0];
$lei->err("# @$cmd") if $verbose;
- $? = 0;
- my $fh = popen_rd($cmd, $env, $rdr);
+ my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
$fh = IO::Uncompress::Gunzip->new($fh);
- eval {
- PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
- $lei, $each_smsg);
- };
- return $lei->fail("E: @$cmd: $@") if $@;
+ PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
+ $lei, $each_smsg);
+ waitpid($pid, 0) == $pid or die "BUG: waitpid (curl): $!";
if ($? == 0) {
my $nr = $lei->{-nr_remote_eml};
mset_progress($lei, $lei->{-current_url}, $nr, $nr);
sub do_query {
my ($self, $lei) = @_;
$lei->{1}->autoflush(1);
+ $lei->start_pager if -t $lei->{1};
+ $lei->{ovv}->ovv_begin($lei);
my ($au_done, $zpipe);
my $l2m = $lei->{l2m};
+ $lei->atfork_prepare_wq($self);
+ $self->wq_workers_start('lei_xsearch', $self->{jobs}, $lei->oldset);
+ delete $self->{-ipc_atfork_child_close};
if ($l2m) {
+ $lei->atfork_prepare_wq($l2m);
+ $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, $lei->oldset);
+ delete $l2m->{-ipc_atfork_child_close};
pipe($lei->{startq}, $au_done) or die "pipe: $!";
# 1031: F_SETPIPE_SZ
fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
'.' => [ \&do_post_augment, $lei, $zpipe, $au_done ],
'' => [ \&query_done, $lei ],
'mset_progress' => [ \&mset_progress, $lei ],
+ 'x_it' => [ $lei->can('x_it'), $lei ],
+ 'child_error' => [ $lei->can('child_error'), $lei ],
};
- (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, !$lei->{oneshot});
+ (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops);
my ($lei_ipc, @io) = $lei->atfork_parent_wq($self);
delete($lei->{pkt_op});