X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLEI.pm;h=44afced306f4722ac0d4ed626010cda5ceea26a1;hb=37e2fd40b9d3c37b202a693f6929411bd1064eae;hp=4f7ed17109ae3f72b21b49f5a97ccb1c730c2ddf;hpb=59ff0aa24dc4c27fc05a36e97526ae4b0c163f65;p=public-inbox.git diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 4f7ed171..44afced3 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -104,7 +104,8 @@ our %CMD = ( # sorted in order of importance/use: 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw( save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a sort|s=s reverse|r offset=i remote! local! external! pretty - mua-cmd|mua=s no-torsocks torsocks=s verbose|v + include|I=s@ exclude=s@ only=s@ + mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q received-after=s received-before=s sent-after=s sent-since=s), PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ], @@ -228,6 +229,13 @@ my %OPTDESC = ( 'q format|f=s' => [ 'OUT|maildir|mboxrd|mboxcl2|mboxcl|mboxo|html|json|jsonl|concatjson', 'specify output format, default depends on --output'], +'q exclude=s@' => [ 'URL_OR_PATHNAME', + 'exclude specified external(s) from search' ], +'q include|I=s@' => [ 'URL_OR_PATHNAME', + 'include specified external(s) in search' ], +'q only=s@' => [ 'URL_OR_PATHNAME', + 'only use specified external(s) for search' ], + 'ls-query format|f=s' => $ls_format, 'ls-external format|f=s' => $ls_format, @@ -276,18 +284,22 @@ sub x_it ($$) { dump_and_clear_log(); if (my $sock = $self->{sock}) { send($sock, "x_it $code", MSG_EOR); - } elsif (my $signum = ($code & 127)) { # oneshot, usually SIGPIPE (13) - $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work - kill $signum, $$; - sleep; # wait for signal - } else { # oneshot + } elsif ($self->{oneshot}) { # don't want to end up using $? from child processes for my $f (qw(lxs l2m)) { my $wq = delete $self->{$f} or next; $wq->DESTROY; } - $quit->($code >> 8); - } + # cleanup anything that has tempfiles + delete @$self{qw(ovv dedupe)}; + if (my $signum = ($code & 127)) { # usually SIGPIPE (13) + $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work + kill $signum, $$; + sleep; # wait for signal + } else { + $quit->($code >> 8); + } + } # else ignore if client disconnected } sub err ($;@) { @@ -305,7 +317,8 @@ sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) } sub fail ($$;$) { my ($self, $buf, $exit_code) = @_; - err($self, $buf); + err($self, $buf) if defined $buf; + send($self->{pkt_op}, '!', MSG_EOR) if $self->{pkt_op}; # fail_handler x_it($self, ($exit_code // 1) << 8); undef; } @@ -325,10 +338,9 @@ sub child_error { # passes non-fatal curl exit codes to user my ($self, $child_error) = @_; # child_error is $? if (my $sock = $self->{sock}) { # send to lei(1) client send($sock, "child_error $child_error", MSG_EOR); - } else { # oneshot + } elsif ($self->{oneshot}) { $self->{child_error} = $child_error; - } - undef; + } # else noop if client disconnected } sub atfork_prepare_wq { @@ -365,18 +377,17 @@ sub io_restore ($$) { } } -# triggers sigpipe_handler -sub note_sigpipe { +sub note_sigpipe { # triggers sigpipe_handler my ($self, $fd) = @_; close(delete($self->{$fd})); # explicit close silences Perl warning - syswrite($self->{op_pipe}, '!') if $self->{op_pipe}; + send($self->{pkt_op}, '|', MSG_EOR) if $self->{pkt_op}; x_it($self, 13); } sub atfork_child_wq { my ($self, $wq) = @_; io_restore($self, $wq); - -p $self->{op_pipe} or die 'BUG: {op_pipe} expected'; + -S $self->{pkt_op} or die 'BUG: {pkt_op} expected'; io_restore($self->{l2m}, $wq); %PATH2CFG = (); undef $errors_log; @@ -401,13 +412,14 @@ sub atfork_parent_wq { my ($self, $wq) = @_; my $env = delete $self->{env}; # env is inherited at fork my $lei = bless { %$self }, ref($self); - if (my $dedupe = delete $lei->{dedupe}) { - $lei->{dedupe} = $wq->deep_clone($dedupe); + for my $f (qw(dedupe ovv)) { + my $tmp = delete($lei->{$f}) or next; + $lei->{$f} = $wq->deep_clone($tmp); } $self->{env} = $env; delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m my @io = (delete(@$lei{qw(0 1 2)}), - io_extract($lei, qw(sock op_pipe startq))); + io_extract($lei, qw(sock pkt_op startq))); my $l2m = $lei->{l2m}; if ($l2m && $l2m != $wq) { # $wq == lxs if (my $wq_s1 = $l2m->{-wq_s1}) { @@ -735,8 +747,7 @@ sub lei__complete { my $opt = quotemeta $1; puts $self, map { my $v = $OPTDESC{$_}; - $v = $v->[0] if ref($v); - my @v = split(/\|/, $v); + my @v = ref($v) ? split(/\|/, $v->[0]) : (); # get rid of ALL CAPS placeholder (e.g "OUT") # (TODO: completion for external paths) shift(@v) if uc($v[0]) eq $v[0]; @@ -776,7 +787,7 @@ sub start_mua { push @cmd, $mfolder unless defined($replaced); if (my $sock = $self->{sock}) { # lei(1) client process runs it send($sock, exec_buf(\@cmd, {}), MSG_EOR); - } else { # oneshot + } elsif ($self->{oneshot}) { $self->{"mua.pid.$self.$$"} = spawn(\@cmd); } } @@ -794,13 +805,16 @@ sub start_pager { $new_env->{MORE} = 'FRX' if $^O eq 'freebsd'; pipe(my ($r, $wpager)) or return warn "pipe: $!"; my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; - my $pgr = [ undef, @$rdr{1, 2}, $$ ]; + my $pgr = [ undef, @$rdr{1, 2} ]; if (my $sock = $self->{sock}) { # lei(1) process runs it delete @$new_env{keys %$env}; # only set iff unset my $fds = [ map { fileno($_) } @$rdr{0..2} ]; $send_cmd->($sock, $fds, exec_buf([$pager], $new_env), MSG_EOR); - } else { + } elsif ($self->{oneshot}) { $pgr->[0] = spawn([$pager], $new_env, $rdr); + $pgr->[3] = $$; # ew'll reap it + } else { + die 'BUG: start_pager w/o socket'; } $self->{1} = $wpager; $self->{2} = $wpager if -t $self->{2}; @@ -815,7 +829,7 @@ sub stop_pager { # do not restore original stdout, just close it so we error out close(delete($self->{1})) if $self->{1}; my $pid = $pgr->[0]; - dwaitpid($pid, undef, $self->{sock}) if $pid && $pgr->[3] == $$; + dwaitpid($pid) if $pid && ($pgr->[3] // 0) == $$; } sub accept_dispatch { # Listener {post_accept} callback @@ -823,7 +837,7 @@ sub accept_dispatch { # Listener {post_accept} callback $sock->autoflush(1); my $self = bless { sock => $sock }, __PACKAGE__; vec(my $rvec = '', fileno($sock), 1) = 1; - select($rvec, undef, undef, 1) or + select($rvec, undef, undef, 60) or return send($sock, 'timed out waiting to recv FDs', MSG_EOR); my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN if (scalar(@fds) == 4) { @@ -833,7 +847,9 @@ sub accept_dispatch { # Listener {post_accept} callback send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR); } } else { - return send($sock, "recv_cmd failed: $!", MSG_EOR); + my $msg = "recv_cmd failed: $!"; + warn $msg; + return send($sock, $msg, MSG_EOR); } $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV); @@ -897,7 +913,6 @@ sub event_step { sub event_step_init { my ($self) = @_; if (my $sock = $self->{sock}) { # using DS->EventLoop - $sock->blocking(0); $self->SUPER::new($sock, EPOLLIN|EPOLLET); } } @@ -1047,6 +1062,7 @@ sub oneshot { local %PATH2CFG; umask(077) // die("umask(077): $!"); my $self = bless { + oneshot => 1, 0 => *STDIN{GLOB}, 1 => *STDOUT{GLOB}, 2 => *STDERR{GLOB},