]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/LEI.pm
lei q: do not leave temporary files after oneshot exit
[public-inbox.git] / lib / PublicInbox / LEI.pm
index 22cd20f6edbbf1d0a5fdfd5731a5e7250b1a70b5..44afced306f4722ac0d4ed626010cda5ceea26a1 100644 (file)
@@ -14,10 +14,9 @@ use Getopt::Long ();
 use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
 use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
 use Cwd qw(getcwd);
-use POSIX ();
+use POSIX qw(strftime);
 use IO::Handle ();
 use Fcntl qw(SEEK_SET);
-use Sys::Syslog qw(syslog openlog);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
 use PublicInbox::Sigfd;
@@ -105,7 +104,8 @@ our %CMD = ( # sorted in order of importance/use:
 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
        save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
        sort|s=s reverse|r offset=i remote! local! external! pretty
-       mua-cmd|mua=s no-torsocks torsocks=s verbose|v
+       include|I=s@ exclude=s@ only=s@
+       mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q
        received-after=s received-before=s sent-after=s sent-since=s),
        PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ],
 
@@ -229,6 +229,13 @@ my %OPTDESC = (
 'q     format|f=s' => [
        'OUT|maildir|mboxrd|mboxcl2|mboxcl|mboxo|html|json|jsonl|concatjson',
                'specify output format, default depends on --output'],
+'q     exclude=s@' => [ 'URL_OR_PATHNAME',
+               'exclude specified external(s) from search' ],
+'q     include|I=s@' => [ 'URL_OR_PATHNAME',
+               'include specified external(s) in search' ],
+'q     only=s@' => [ 'URL_OR_PATHNAME',
+               'only use specified external(s) for search' ],
+
 'ls-query      format|f=s' => $ls_format,
 'ls-external   format|f=s' => $ls_format,
 
@@ -277,18 +284,22 @@ sub x_it ($$) {
        dump_and_clear_log();
        if (my $sock = $self->{sock}) {
                send($sock, "x_it $code", MSG_EOR);
-       } elsif (my $signum = ($code & 127)) { # oneshot, usually SIGPIPE (13)
-               $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work
-               kill $signum, $$;
-               sleep; # wait for signal
-       } else { # oneshot
+       } elsif ($self->{oneshot}) {
                # don't want to end up using $? from child processes
                for my $f (qw(lxs l2m)) {
                        my $wq = delete $self->{$f} or next;
                        $wq->DESTROY;
                }
-               $quit->($code >> 8);
-       }
+               # cleanup anything that has tempfiles
+               delete @$self{qw(ovv dedupe)};
+               if (my $signum = ($code & 127)) { # usually SIGPIPE (13)
+                       $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work
+                       kill $signum, $$;
+                       sleep; # wait for signal
+               } else {
+                       $quit->($code >> 8);
+               }
+       } # else ignore if client disconnected
 }
 
 sub err ($;@) {
@@ -306,7 +317,8 @@ sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
 
 sub fail ($$;$) {
        my ($self, $buf, $exit_code) = @_;
-       err($self, $buf);
+       err($self, $buf) if defined $buf;
+       send($self->{pkt_op}, '!', MSG_EOR) if $self->{pkt_op}; # fail_handler
        x_it($self, ($exit_code // 1) << 8);
        undef;
 }
@@ -326,10 +338,9 @@ sub child_error { # passes non-fatal curl exit codes to user
        my ($self, $child_error) = @_; # child_error is $?
        if (my $sock = $self->{sock}) { # send to lei(1) client
                send($sock, "child_error $child_error", MSG_EOR);
-       } else { # oneshot
+       } elsif ($self->{oneshot}) {
                $self->{child_error} = $child_error;
-       }
-       undef;
+       } # else noop if client disconnected
 }
 
 sub atfork_prepare_wq {
@@ -366,18 +377,17 @@ sub io_restore ($$) {
        }
 }
 
-# triggers sigpipe_handler
-sub note_sigpipe {
+sub note_sigpipe { # triggers sigpipe_handler
        my ($self, $fd) = @_;
        close(delete($self->{$fd})); # explicit close silences Perl warning
-       syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
+       send($self->{pkt_op}, '|', MSG_EOR) if $self->{pkt_op};
        x_it($self, 13);
 }
 
 sub atfork_child_wq {
        my ($self, $wq) = @_;
        io_restore($self, $wq);
-       -p $self->{op_pipe} or die 'BUG: {op_pipe} expected';
+       -S $self->{pkt_op} or die 'BUG: {pkt_op} expected';
        io_restore($self->{l2m}, $wq);
        %PATH2CFG = ();
        undef $errors_log;
@@ -402,13 +412,14 @@ sub atfork_parent_wq {
        my ($self, $wq) = @_;
        my $env = delete $self->{env}; # env is inherited at fork
        my $lei = bless { %$self }, ref($self);
-       if (my $dedupe = delete $lei->{dedupe}) {
-               $lei->{dedupe} = $wq->deep_clone($dedupe);
+       for my $f (qw(dedupe ovv)) {
+               my $tmp = delete($lei->{$f}) or next;
+               $lei->{$f} = $wq->deep_clone($tmp);
        }
        $self->{env} = $env;
        delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m
        my @io = (delete(@$lei{qw(0 1 2)}),
-                       io_extract($lei, qw(sock op_pipe startq)));
+                       io_extract($lei, qw(sock pkt_op startq)));
        my $l2m = $lei->{l2m};
        if ($l2m && $l2m != $wq) { # $wq == lxs
                if (my $wq_s1 = $l2m->{-wq_s1}) {
@@ -736,8 +747,7 @@ sub lei__complete {
                my $opt = quotemeta $1;
                puts $self, map {
                        my $v = $OPTDESC{$_};
-                       $v = $v->[0] if ref($v);
-                       my @v = split(/\|/, $v);
+                       my @v = ref($v) ? split(/\|/, $v->[0]) : ();
                        # get rid of ALL CAPS placeholder (e.g "OUT")
                        # (TODO: completion for external paths)
                        shift(@v) if uc($v[0]) eq $v[0];
@@ -777,7 +787,7 @@ sub start_mua {
        push @cmd, $mfolder unless defined($replaced);
        if (my $sock = $self->{sock}) { # lei(1) client process runs it
                send($sock, exec_buf(\@cmd, {}), MSG_EOR);
-       } else { # oneshot
+       } elsif ($self->{oneshot}) {
                $self->{"mua.pid.$self.$$"} = spawn(\@cmd);
        }
 }
@@ -795,13 +805,16 @@ sub start_pager {
        $new_env->{MORE} = 'FRX' if $^O eq 'freebsd';
        pipe(my ($r, $wpager)) or return warn "pipe: $!";
        my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
-       my $pgr = [ undef, @$rdr{1, 2}, $$ ];
+       my $pgr = [ undef, @$rdr{1, 2} ];
        if (my $sock = $self->{sock}) { # lei(1) process runs it
                delete @$new_env{keys %$env}; # only set iff unset
                my $fds = [ map { fileno($_) } @$rdr{0..2} ];
                $send_cmd->($sock, $fds, exec_buf([$pager], $new_env), MSG_EOR);
-       } else {
+       } elsif ($self->{oneshot}) {
                $pgr->[0] = spawn([$pager], $new_env, $rdr);
+               $pgr->[3] = $$; # ew'll reap it
+       } else {
+               die 'BUG: start_pager w/o socket';
        }
        $self->{1} = $wpager;
        $self->{2} = $wpager if -t $self->{2};
@@ -816,7 +829,7 @@ sub stop_pager {
        # do not restore original stdout, just close it so we error out
        close(delete($self->{1})) if $self->{1};
        my $pid = $pgr->[0];
-       dwaitpid($pid, undef, $self->{sock}) if $pid && $pgr->[3] == $$;
+       dwaitpid($pid) if $pid && ($pgr->[3] // 0) == $$;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -824,7 +837,7 @@ sub accept_dispatch { # Listener {post_accept} callback
        $sock->autoflush(1);
        my $self = bless { sock => $sock }, __PACKAGE__;
        vec(my $rvec = '', fileno($sock), 1) = 1;
-       select($rvec, undef, undef, 1) or
+       select($rvec, undef, undef, 60) or
                return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
        my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
        if (scalar(@fds) == 4) {
@@ -834,7 +847,9 @@ sub accept_dispatch { # Listener {post_accept} callback
                        send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR);
                }
        } else {
-               return send($sock, "recv_cmd failed: $!", MSG_EOR);
+               my $msg = "recv_cmd failed: $!";
+               warn $msg;
+               return send($sock, $msg, MSG_EOR);
        }
        $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
        # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
@@ -861,7 +876,7 @@ sub dclose {
                if ($wq->wq_kill) {
                        $wq->wq_close
                } elsif ($wq->wq_kill_old) {
-                       $wq->wq_wait_old;
+                       $wq->wq_wait_old($self);
                }
        }
        close(delete $self->{1}) if $self->{1}; # may reap_compress
@@ -898,7 +913,6 @@ sub event_step {
 sub event_step_init {
        my ($self) = @_;
        if (my $sock = $self->{sock}) { # using DS->EventLoop
-               $sock->blocking(0);
                $self->SUPER::new($sock, EPOLLIN|EPOLLET);
        }
 }
@@ -1007,9 +1021,9 @@ sub lazy_start {
                                warn "$path dev/ino changed, quitting\n";
                                $path = undef;
                        }
-               } elsif (defined($path)) {
-                       warn "stat($path): $!, quitting ...\n";
-                       undef $path; # don't unlink
+               } elsif (defined($path)) { # ENOENT is common
+                       warn "stat($path): $!, quitting ...\n" if $! != ENOENT;
+                       undef $path;
                        $quit->();
                }
                return 1 if defined($path);
@@ -1029,18 +1043,14 @@ sub lazy_start {
        # STDIN was redirected to /dev/null above, closing STDERR and
        # STDOUT will cause the calling `lei' client process to finish
        # reading the <$daemon> pipe.
-       openlog($path, 'pid', 'user');
        local $SIG{__WARN__} = sub {
-               $current_lei ? err($current_lei, @_) : syslog('warning', "@_");
+               $current_lei ? err($current_lei, @_) : warn(
+                 strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(time))," $$ ", @_);
        };
-       my $on_destroy = PublicInbox::OnDestroy->new($$, sub {
-               syslog('crit', "$@") if $@;
-       });
        open STDERR, '>&STDIN' or die "redirect stderr failed: $!";
        open STDOUT, '>&STDIN' or die "redirect stdout failed: $!";
        # $daemon pipe to `lei' closed, main loop begins:
        PublicInbox::DS->EventLoop;
-       @$on_destroy = (); # cancel on_destroy if we get here
        exit($exit_code // 0);
 }
 
@@ -1052,6 +1062,7 @@ sub oneshot {
        local %PATH2CFG;
        umask(077) // die("umask(077): $!");
        my $self = bless {
+               oneshot => 1,
                0 => *STDIN{GLOB},
                1 => *STDOUT{GLOB},
                2 => *STDERR{GLOB},