X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLeiXSearch.pm;h=95862306cd7a98beba3aca94659f17bd7bf3460a;hb=7349713101700e488231ad9ffece8ee42de0928c;hp=1c093a94ca35389aadfdeabf622cfccd25af821a;hpb=0ca8bd671784ffd1a4ed59914e0b9a0bef7c0ea5;p=public-inbox.git diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 1c093a94..95862306 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -8,13 +8,16 @@ package PublicInbox::LeiXSearch; use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); -use PublicInbox::DS qw(dwaitpid); -use PublicInbox::OpPipe; +use PublicInbox::DS qw(dwaitpid now); +use PublicInbox::PktOp qw(pkt_do); use PublicInbox::Import; use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); use PublicInbox::Search qw(xap_terms); -use PublicInbox::Spawn qw(popen_rd); +use PublicInbox::Spawn qw(popen_rd spawn which); +use PublicInbox::MID qw(mids); +use PublicInbox::Smsg; +use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR); sub new { my ($class) = @_; @@ -31,7 +34,7 @@ sub attach_external { my $srch = $ibxish->search or return warn("$desc not indexed for Xapian\n"); my @shards = $srch->xdb_shards_flat or - return warn("$desc has no Xapian shardsXapian\n"); + return warn("$desc has no Xapian shards\n"); if (delete $self->{xdb}) { # XXX: do we need this? # clobber existing {xdb} if amending @@ -93,7 +96,7 @@ sub over {} sub _mset_more ($$) { my ($mset, $mo) = @_; my $size = $mset->size; - $size && (($mo->{offset} += $size) < ($mo->{limit} // 10000)); + $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit}); } # $startq will EOF when query_prepare is done augmenting and allow @@ -107,23 +110,19 @@ sub wait_startq ($) { sub query_thread_mset { # for --thread my ($self, $lei, $ibxish) = @_; local $0 = "$0 query_thread_mset"; - my $startq = delete $self->{5}; - my %sig = $lei->atfork_child_wq($self); - local @SIG{keys %sig} = values %sig; + $lei->atfork_child_wq($self); + my $startq = delete $lei->{startq}; my ($srch, $over) = ($ibxish->search, $ibxish->over); - unless ($srch && $over) { - my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; - warn "$desc not indexed by Xapian\n"; - return; - } + my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; + return warn("$desc not indexed by Xapian\n") unless ($srch && $over); my $mo = { %{$lei->{mset_opt}} }; my $mset; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish); - my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing'; - $dedupe->prepare_dedupe; do { $mset = $srch->mset($mo->{qstr}, $mo); + pkt_do($lei->{pkt_op}, 'mset_progress', $desc, $mset->size, + $mset->get_matches_estimated); my $ids = $srch->mset_to_artnums($mset, $mo); my $ctx = { ids => $ids }; my $i = 0; @@ -132,7 +131,6 @@ sub query_thread_mset { # for --thread for my $n (@{$ctx->{xids}}) { my $smsg = $over->get_art($n) or next; wait_startq($startq) if $startq; - next if $dedupe->is_smsg_dup($smsg); my $mitem = delete $n2item{$smsg->{num}}; $each_smsg->($smsg, $mitem); } @@ -146,23 +144,21 @@ sub query_thread_mset { # for --thread sub query_mset { # non-parallel for non-"--thread" users my ($self, $lei) = @_; local $0 = "$0 query_mset"; - my $startq = delete $self->{5}; - my %sig = $lei->atfork_child_wq($self); - local @SIG{keys %sig} = values %sig; + $lei->atfork_child_wq($self); + my $startq = delete $lei->{startq}; my $mo = { %{$lei->{mset_opt}} }; my $mset; for my $loc (locals($self)) { attach_external($self, $loc); } my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self); - my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing'; - $dedupe->prepare_dedupe; do { $mset = $self->mset($mo->{qstr}, $mo); + pkt_do($lei->{pkt_op}, 'mset_progress', 'xsearch', + $mset->size, $mset->get_matches_estimated); for my $mitem ($mset->items) { my $smsg = smsg_for($self, $mitem) or next; wait_startq($startq) if $startq; - next if $dedupe->is_smsg_dup($smsg); $each_smsg->($smsg, $mitem); } } while (_mset_more($mset, $mo)); @@ -174,26 +170,53 @@ sub each_eml { # callback for MboxReader->mboxrd my ($eml, $self, $lei, $each_smsg) = @_; my $smsg = bless {}, 'PublicInbox::Smsg'; $smsg->populate($eml); + $smsg->parse_references($eml, mids($eml)); $smsg->{$_} //= '' for qw(from to cc ds subject references mid); delete @$smsg{qw(From Subject -ds -ts)}; - if (my $startq = delete($self->{5})) { wait_startq($startq) } - return if !$lei->{l2m} && $lei->{dedupe}->is_smsg_dup($smsg); + if (my $startq = delete($lei->{startq})) { wait_startq($startq) } + ++$lei->{-nr_remote_eml}; + if (!$lei->{opt}->{quiet}) { + my $now = now(); + my $next = $lei->{-next_progress} //= ($now + 1); + if ($now > $next) { + $lei->{-next_progress} = $now + 1; + my $nr = $lei->{-nr_remote_eml}; + $lei->err("# $lei->{-current_url} $nr/?"); + } + } $each_smsg->($smsg, undef, $eml); } +# PublicInbox::OnDestroy callback +sub kill_reap { + my ($pid) = @_; + kill('KILL', $pid); # spawn() blocks other signals + waitpid($pid, 0); +} + sub query_remote_mboxrd { my ($self, $lei, $uris) = @_; local $0 = "$0 query_remote_mboxrd"; - my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq - local @SIG{keys %sig} = values %sig; + $lei->atfork_child_wq($self); my ($opt, $env) = @$lei{qw(opt env)}; my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm'); push(@qform, t => 1) if $opt->{thread}; - my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing'; - $dedupe->prepare_dedupe; - my @cmd = qw(curl -XPOST -sSf); + my @cmd = ($self->{curl}, qw(-sSf -d), ''); my $verbose = $opt->{verbose}; - push @cmd, '-v' if $verbose; + my $reap; + my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1); + fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!"; + my $rdr = { 2 => $cerr }; + my $coff = 0; + if ($verbose) { + # spawn a process to force line-buffering, otherwise curl + # will write 1 character at-a-time and parallel outputs + # mmmaaayyy llloookkk llliiikkkeee ttthhhiiisss + push @cmd, '-v'; + my $o = { 1 => $lei->{2}, 2 => $lei->{2} }; + my $pid = spawn(['tail', '-f', $cerr->filename], undef, $o); + $reap = PublicInbox::OnDestroy->new(\&kill_reap, $pid); + } for my $o ($lei->curl_opt) { $o =~ s/\|[a-z0-9]\b//i; # remove single char short option if ($o =~ s/=[is]@\z//) { @@ -208,34 +231,50 @@ sub query_remote_mboxrd { } $opt->{torsocks} = 'false' if $opt->{'no-torsocks'}; my $tor = $opt->{torsocks} //= 'auto'; + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); for my $uri (@$uris) { + $lei->{-current_url} = $uri->as_string; + $lei->{-nr_remote_eml} = 0; $uri->query_form(@qform); - my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $uri); my $cmd = [ @cmd, $uri->as_string ]; if ($tor eq 'auto' && substr($uri->host, -6) eq '.onion' && (($env->{LD_PRELOAD}//'') !~ /torsocks/)) { - unshift @$cmd, 'torsocks'; + unshift @$cmd, which('torsocks'); } elsif (PublicInbox::Config::git_bool($tor)) { - unshift @$cmd, 'torsocks'; + unshift @$cmd, which('torsocks'); } + + # continue anyways if torsocks is missing; a proxy may be + # specified via CLI, curlrc, environment variable, or even + # firewall rule + shift(@$cmd) if !$cmd->[0]; + $lei->err("# @$cmd") if $verbose; $? = 0; - my $fh = popen_rd($cmd, $env, { 2 => $lei->{2} }); + my $fh = popen_rd($cmd, $env, $rdr); $fh = IO::Uncompress::Gunzip->new($fh); eval { PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self, $lei, $each_smsg); }; return $lei->fail("E: @$cmd: $@") if $@; - if (($? >> 8) == 22) { # HTTP 404 from curl(1) - $uri->query_form(q => $lei->{mset_opt}->{qstr}); - $lei->err('# no results from '.$uri->as_string); - } elsif ($?) { - $uri->query_form(q => $lei->{mset_opt}->{qstr}); - $lei->err('E: '.$uri->as_string); - $lei->child_error($?); + if ($? == 0) { + my $nr = $lei->{-nr_remote_eml}; + pkt_do($lei->{pkt_op}, 'mset_progress', + $lei->{-current_url}, $nr, $nr); + next; } + seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n"; + my $e = do { local $/; <$cerr> } // + die "read(curl stderr): $!\n"; + $coff += length($e); + next if (($? >> 8) == 22 && $e =~ /\b404\b/); + $lei->child_error($?); + $uri->query_form(q => $lei->{mset_opt}->{qstr}); + # --verbose already showed the error via tail(1) + $lei->err("E: $uri \$?=$?\n", $verbose ? () : $e); } + undef $each_smsg; $lei->{ovv}->ovv_atexit_child($lei); } @@ -263,7 +302,7 @@ sub query_done { # EOF callback my $has_l2m = exists $lei->{l2m}; for my $f (qw(lxs l2m)) { my $wq = delete $lei->{$f} or next; - $wq->wq_wait_old; + $wq->wq_wait_old($lei); } $lei->{ovv}->ovv_end($lei); if ($has_l2m) { # close() calls LeiToMail reap_compress @@ -277,9 +316,19 @@ Error closing $lei->{ovv}->{dst}: $! } $lei->start_mua; } + $lei->{opt}->{quiet} or + $lei->err('# ', $lei->{-mset_total} // 0, " matches"); $lei->dclose; } +sub mset_progress { # called via pkt_op/pkt_do from workers + my ($lei, $pargs) = @_; + my ($desc, $mset_size, $mset_total_est) = @$pargs; + return if $lei->{opt}->{quiet}; + $lei->{-mset_total} += $mset_size; + $lei->err("# $desc $mset_size/$mset_total_est"); +} + sub do_post_augment { my ($lei, $zpipe, $au_done) = @_; my $l2m = $lei->{l2m} or die 'BUG: no {l2m}'; @@ -311,7 +360,7 @@ sub start_query { # always runs in main (lei-daemon) process for my $ibxish (locals($self)) { $self->wq_do('query_thread_mset', $io, $lei, $ibxish); } - } else { + } elsif (locals($self)) { $self->wq_do('query_mset', $io, $lei); } my $i = 0; @@ -328,80 +377,80 @@ sub start_query { # always runs in main (lei-daemon) process sub query_prepare { # called by wq_do my ($self, $lei) = @_; local $0 = "$0 query_prepare"; - my %sig = $lei->atfork_child_wq($self); - -p $lei->{0} or die "BUG: \$done pipe expected"; - local @SIG{keys %sig} = values %sig; + $lei->atfork_child_wq($self); + delete $lei->{l2m}->{-wq_s1}; eval { $lei->{l2m}->do_augment($lei) }; $lei->fail($@) if $@; - syswrite($lei->{0}, '.') == 1 or die "do_post_augment trigger: $!"; + pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!" } -sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers - my ($lei) = @_; - my $lxs = delete $lei->{lxs}; - if ($lxs && $lxs->wq_kill_old) { - kill 'PIPE', $$; - $lxs->wq_wait_old; +sub fail_handler ($;$$) { + my ($lei, $code, $io) = @_; + if (my $lxs = delete $lei->{lxs}) { + $lxs->wq_wait_old($lei) if $lxs->wq_kill_old; # lei-daemon } - close(delete $lei->{1}) if $lei->{1}; + close($io) if $io; # needed to avoid warnings on SIGPIPE + $lei->x_it($code // (1 >> 8)); +} + +sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers + fail_handler($_[0], 13, delete $_[0]->{1}); } sub do_query { - my ($self, $lei_orig) = @_; - my ($lei, @io) = $lei_orig->atfork_parent_wq($self); - $io[0] = undef; - pipe(my $done, $io[0]) or die "pipe $!"; - $lei_orig->{1}->autoflush(1); - - $lei_orig->event_step_init; # wait for shutdowns - my $done_op = { - '' => [ \&query_done, $lei_orig ], - '!' => [ \&sigpipe_handler, $lei_orig ] - }; - my $in_loop = exists $lei_orig->{sock}; - $done = PublicInbox::OpPipe->new($done, $done_op, $in_loop); + my ($self, $lei) = @_; + $lei->{1}->autoflush(1); + my ($au_done, $zpipe); my $l2m = $lei->{l2m}; if ($l2m) { - # may redirect $lei->{1} for mbox - my $zpipe = $l2m->pre_augment($lei_orig); - $io[1] = $lei_orig->{1}; - pipe(my ($startq, $au_done)) or die "pipe: $!"; - $done_op->{'.'} = [ \&do_post_augment, $lei_orig, - $zpipe, $au_done ]; - local $io[4] = *STDERR{GLOB}; # don't send l2m->{-wq_s1} - die "BUG: unexpected \$io[5]: $io[5]" if $io[5]; - $self->wq_do('query_prepare', \@io, $lei); - fcntl($startq, 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ - $io[5] = $startq; + pipe($lei->{startq}, $au_done) or die "pipe: $!"; + # 1031: F_SETPIPE_SZ + fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; + $zpipe = $l2m->pre_augment($lei); + } + my $in_loop = exists $lei->{sock}; + my $ops = { + '|' => [ \&sigpipe_handler, $lei ], + '!' => [ \&fail_handler, $lei ], + '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ], + '' => [ \&query_done, $lei ], + 'mset_progress' => [ \&mset_progress, $lei ], + }; + (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops, $in_loop); + my ($lei_ipc, @io) = $lei->atfork_parent_wq($self); + delete($lei->{pkt_op}); + + $lei->event_step_init; # wait for shutdowns + if ($l2m) { + $self->wq_do('query_prepare', \@io, $lei_ipc); $io[1] = $zpipe->[1] if $zpipe; } - start_query($self, \@io, $lei); + start_query($self, \@io, $lei_ipc); $self->wq_close(1); unless ($in_loop) { - # for the $lei->atfork_child_wq PIPE handler: - while ($done->{sock}) { $done->event_step } + # for the $lei_ipc->atfork_child_wq PIPE handler: + while ($op->{sock}) { $op->event_step } } } -sub ipc_atfork_prepare { - my ($self) = @_; - if (exists $self->{remotes}) { +sub add_uri { + my ($self, $uri) = @_; + if (my $curl = $self->{curl} //= which('curl') // 0) { require PublicInbox::MboxReader; require IO::Uncompress::Gunzip; + push @{$self->{remotes}}, $uri; + } else { + warn "curl missing, ignoring $uri\n"; } - # (0: done_wr, 1: stdout|mbox, 2: stderr, - # 3: sock, 4: $l2m->{-wq_s1}, 5: $startq) - $self->wq_set_recv_modes(qw[+<&= >&= >&= +<&= +<&= <&=]); - $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } sub prepare_external { my ($self, $loc, $boost) = @_; # n.b. already ordered by boost if (ref $loc) { # already a URI, or PublicInbox::Inbox-like object - return push(@{$self->{remotes}}, $loc) if $loc->can('scheme'); + return add_uri($self, $loc) if $loc->can('scheme'); } elsif ($loc =~ m!\Ahttps?://!) { require URI; - return push(@{$self->{remotes}}, URI->new($loc)); + return add_uri($self, URI->new($loc)); } elsif (-f "$loc/ei.lock") { require PublicInbox::ExtSearch; $loc = PublicInbox::ExtSearch->new($loc);