+sub each_eml { # callback for MboxReader->mboxrd
+ my ($eml, $self, $lei, $each_smsg) = @_;
+ my $smsg = bless {}, 'PublicInbox::Smsg';
+ $smsg->populate($eml);
+ $smsg->parse_references($eml, mids($eml));
+ $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
+ delete @$smsg{qw(From Subject -ds -ts)};
+ if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+ if ($lei->{-progress}) {
+ ++$lei->{-nr_remote_eml};
+ my $now = now();
+ my $next = $lei->{-next_progress} //= ($now + 1);
+ if ($now > $next) {
+ $lei->{-next_progress} = $now + 1;
+ my $nr = $lei->{-nr_remote_eml};
+ $lei->err("# $lei->{-current_url} $nr/?");
+ }
+ }
+ $each_smsg->($smsg, undef, $eml);
+}
+
+sub query_remote_mboxrd {
+ my ($self, $uris) = @_;
+ local $0 = "$0 query_remote_mboxrd";
+ local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
+ my $lei = $self->{lei};
+ my ($opt, $env) = @$lei{qw(opt env)};
+ my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
+ push(@qform, t => 1) if $opt->{thread};
+ my $verbose = $opt->{verbose};
+ my ($reap_tail, $reap_curl);
+ my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
+ fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
+ my $rdr = { 2 => $cerr, pgid => 0 };
+ my $coff = 0;
+ my $sigint_reap = $lei->can('sigint_reap');
+ if ($verbose) {
+ # spawn a process to force line-buffering, otherwise curl
+ # will write 1 character at-a-time and parallel outputs
+ # mmmaaayyy llloookkk llliiikkkeee ttthhhiiisss
+ my $o = { 1 => $lei->{2}, 2 => $lei->{2}, pgid => 0 };
+ my $pid = spawn(['tail', '-f', $cerr->filename], undef, $o);
+ $reap_tail = PublicInbox::OnDestroy->new($sigint_reap, $pid);
+ }
+ my $curl = PublicInbox::LeiCurl->new($lei, $self->{curl}) or return;
+ push @$curl, '-s', '-d', '';
+ my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ for my $uri (@$uris) {
+ $lei->{-current_url} = $uri->as_string;
+ $lei->{-nr_remote_eml} = 0;
+ $uri->query_form(@qform);
+ my $cmd = $curl->for_uri($lei, $uri);
+ $lei->err("# @$cmd") if $verbose;
+ my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
+ $reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
+ $fh = IO::Uncompress::Gunzip->new($fh);
+ PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
+ $lei, $each_smsg);
+ my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid: $!";
+ @$reap_curl = (); # cancel OnDestroy
+ die $err if $err;
+ if ($? == 0) {
+ my $nr = $lei->{-nr_remote_eml};
+ mset_progress($lei, $lei->{-current_url}, $nr, $nr);
+ next;
+ }
+ seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
+ my $e = do { local $/; <$cerr> } //
+ die "read(curl stderr): $!\n";
+ $coff += length($e);
+ truncate($cerr, 0);
+ next if (($? >> 8) == 22 && $e =~ /\b404\b/);
+ $lei->child_error($?);
+ $uri->query_form(q => $lei->{mset_opt}->{qstr});
+ # --verbose already showed the error via tail(1)
+ $lei->err("E: $uri \$?=$?\n", $verbose ? () : $e);
+ }
+ undef $each_smsg;
+ $lei->{ovv}->ovv_atexit_child($lei);
+}
+
+# called by LeiOverview::each_smsg_cb
+sub git { $_[0]->{git_tmp} // die 'BUG: caller did not set {git_tmp}' }
+
+sub git_tmp ($) {
+ my ($self) = @_;
+ my (%seen, @dirs);
+ my $tmp = File::Temp->newdir("lei_xsearch_git.$$-XXXX", TMPDIR => 1);
+ for my $ibxish (locals($self)) {
+ my $d = File::Spec->canonpath($ibxish->git->{git_dir});
+ $seen{$d} //= push @dirs, "$d/objects\n"
+ }
+ my $git_dir = $tmp->dirname;
+ PublicInbox::Import::init_bare($git_dir);
+ my $f = "$git_dir/objects/info/alternates";
+ open my $alt, '>', $f or die "open($f): $!";
+ print $alt @dirs or die "print $f: $!";
+ close $alt or die "close $f: $!";
+ my $git = PublicInbox::Git->new($git_dir);
+ $git->{-tmp} = $tmp;
+ $git;
+}
+
+sub query_done { # EOF callback for main daemon
+ my ($lei) = @_;
+ my $l2m = delete $lei->{l2m};
+ $l2m->wq_wait_old($lei) if $l2m;
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_wait_old($lei);
+ }
+ $lei->{ovv}->ovv_end($lei);
+ if ($l2m) { # close() calls LeiToMail reap_compress
+ if (my $out = delete $lei->{old_1}) {
+ if (my $mbout = $lei->{1}) {
+ close($mbout) or return $lei->fail(<<"");
+Error closing $lei->{ovv}->{dst}: $!
+
+ }
+ $lei->{1} = $out;
+ }
+ $l2m->lock_free ? $l2m->poke_dst : $lei->start_mua;
+ }
+ $lei->{-progress} and
+ $lei->err('# ', $lei->{-mset_total} // 0, " matches");
+ $lei->dclose;
+}
+
+sub do_post_augment {
+ my ($lei) = @_;
+ eval { $lei->{l2m}->post_augment($lei) };
+ if (my $err = $@) {
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_kill;
+ $lxs->wq_close;
+ }
+ $lei->fail("$err");
+ }
+ close(delete $lei->{au_done}); # triggers wait_startq
+}
+
+my $MAX_PER_HOST = 4;
+
+sub concurrency {
+ my ($self, $opt) = @_;
+ my $nl = $opt->{thread} ? locals($self) : 1;
+ my $nr = remotes($self);
+ $nr = $MAX_PER_HOST if $nr > $MAX_PER_HOST;
+ $nl + $nr;
+}
+
+sub start_query { # always runs in main (lei-daemon) process
+ my ($self, $lei) = @_;
+ if (my $l2m = $lei->{l2m}) {
+ $lei->start_mua if $l2m->lock_free;
+ }