+sub each_remote_eml { # callback for MboxReader->mboxrd
+ my ($eml, $self, $lei, $each_smsg) = @_;
+ if (my $sto = $self->{import_sto}) {
+ $sto->ipc_do('add_eml_maybe', $eml);
+ }
+ my $smsg = bless {}, 'PublicInbox::Smsg';
+ $smsg->populate($eml);
+ $smsg->parse_references($eml, mids($eml));
+ $smsg->{$_} //= '' for qw(from to cc ds subject references mid);
+ delete @$smsg{qw(From Subject -ds -ts)};
+ wait_startq($lei);
+ if ($lei->{-progress}) {
+ ++$lei->{-nr_remote_eml};
+ my $now = now();
+ my $next = $lei->{-next_progress} //= ($now + 1);
+ if ($now > $next) {
+ $lei->{-next_progress} = $now + 1;
+ my $nr = $lei->{-nr_remote_eml};
+ $lei->err("# $lei->{-current_url} $nr/?");
+ }
+ }
+ $each_smsg->($smsg, undef, $eml);
+}
+
+sub query_remote_mboxrd {
+ my ($self, $uris) = @_;
+ local $0 = "$0 query_remote_mboxrd";
+ local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
+ my $lei = $self->{lei};
+ my $opt = $lei->{opt};
+ my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
+ push(@qform, t => 1) if $opt->{threads};
+ my $verbose = $opt->{verbose};
+ my ($reap_tail, $reap_curl);
+ my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
+ fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
+ my $rdr = { 2 => $cerr, pgid => 0 };
+ my $sigint_reap = $lei->can('sigint_reap');
+ if ($verbose) {
+ # spawn a process to force line-buffering, otherwise curl
+ # will write 1 character at-a-time and parallel outputs
+ # mmmaaayyy llloookkk llliiikkkeee ttthhhiiisss
+ my $o = { 1 => $lei->{2}, 2 => $lei->{2}, pgid => 0 };
+ my $pid = spawn(['tail', '-f', $cerr->filename], undef, $o);
+ $reap_tail = PublicInbox::OnDestroy->new($sigint_reap, $pid);
+ }
+ my $curl = PublicInbox::LeiCurl->new($lei, $self->{curl}) or return;
+ push @$curl, '-s', '-d', '';
+ my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ $self->{import_sto} = $lei->{sto} if $lei->{opt}->{'import-remote'};
+ for my $uri (@$uris) {
+ $lei->{-current_url} = $uri->as_string;
+ $lei->{-nr_remote_eml} = 0;
+ $uri->query_form(@qform);
+ my $cmd = $curl->for_uri($lei, $uri);
+ $lei->qerr("# $cmd");
+ my ($fh, $pid) = popen_rd($cmd, undef, $rdr);
+ $reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
+ $fh = IO::Uncompress::Gunzip->new($fh);
+ PublicInbox::MboxReader->mboxrd($fh, \&each_remote_eml, $self,
+ $lei, $each_smsg);
+ my $err = waitpid($pid, 0) == $pid ? undef
+ : "BUG: waitpid($cmd): $!";
+ @$reap_curl = (); # cancel OnDestroy
+ die $err if $err;
+ my $nr = $lei->{-nr_remote_eml};
+ if ($nr && $lei->{sto}) {
+ my $wait = $lei->{sto}->ipc_do('done');
+ }
+ if ($? == 0) {
+ mset_progress($lei, $lei->{-current_url}, $nr, $nr);
+ next;
+ }
+ $err = '';
+ if (-s $cerr) {
+ seek($cerr, 0, SEEK_SET) or
+ $lei->err("seek($cmd stderr): $!");
+ $err = do { local $/; <$cerr> } //
+ "read($cmd stderr): $!";
+ truncate($cerr, 0) or
+ $lei->err("truncate($cmd stderr): $!");
+ }
+ next if (($? >> 8) == 22 && $err =~ /\b404\b/);
+ $uri->query_form(q => $lei->{mset_opt}->{qstr});
+ $lei->child_error($?, "E: <$uri> $err");
+ }
+ undef $each_smsg;
+ $lei->{ovv}->ovv_atexit_child($lei);
+}
+
+sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
+
+sub xsearch_done_wait { # dwaitpid callback
+ my ($arg, $pid) = @_;
+ my ($wq, $lei) = @$arg;
+ $lei->child_error($?, 'non-fatal error from '.ref($wq)) if $?;
+}
+
+sub query_done { # EOF callback for main daemon