X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLEI.pm;h=6c2515dc0eb74cf37530445e27acb44134d44702;hb=7349713101700e488231ad9ffece8ee42de0928c;hp=fd412324be3b9d3eefbc73f983a0e11938fcd79f;hpb=f2c7b911a1c4a7520091ba7224773c30e409c337;p=public-inbox.git diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index fd412324..6c2515dc 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020 all contributors +# Copyright (C) 2020-2021 all contributors # License: AGPL-3.0+ # Backend for `lei' (local email interface). Unlike the C10K-oriented @@ -8,23 +8,28 @@ package PublicInbox::LEI; use strict; use v5.10.1; -use parent qw(PublicInbox::DS); +use parent qw(PublicInbox::DS PublicInbox::LeiExternal + PublicInbox::LeiQuery); use Getopt::Long (); -use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un); -use Errno qw(EAGAIN ECONNREFUSED ENOENT); -use POSIX (); +use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); +use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET); +use Cwd qw(getcwd); +use POSIX qw(strftime); use IO::Handle (); -use Sys::Syslog qw(syslog openlog); +use Fcntl qw(SEEK_SET); use PublicInbox::Config; -use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT); +use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET); use PublicInbox::Sigfd; -use PublicInbox::DS qw(now); -use PublicInbox::Spawn qw(spawn); +use PublicInbox::DS qw(now dwaitpid); +use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::OnDestroy; use Text::Wrap qw(wrap); +use Time::HiRes qw(stat); # ctime comparisons for config cache use File::Path qw(mkpath); use File::Spec; our $quit = \&CORE::exit; +our ($current_lei, $errors_log, $listener); +my ($recv_cmd, $send_cmd); my $GLP = Getopt::Long::Parser->new; $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev)); my $GLP_PASS = Getopt::Long::Parser->new; @@ -36,40 +41,85 @@ our %PATH2CFG; # persistent for socket daemon # (may) pass options through to another command: sub pass_through { $GLP_PASS } +my $OPT; +sub opt_dash ($$) { + my ($spec, $re_str) = @_; # 'limit|n=i', '([0-9]+)' + my ($key) = ($spec =~ m/\A([a-z]+)/g); + my $cb = sub { # Getopt::Long "<>" catch-all handler + my ($arg) = @_; + if ($arg =~ /\A-($re_str)\z/) { + $OPT->{$key} = $1; + } elsif ($arg eq '--') { # "--" arg separator, ignore first + push @{$OPT->{-argv}}, $arg if $OPT->{'--'}++; + # lone (single) dash is handled elsewhere + } elsif (substr($arg, 0, 1) eq '-') { + if ($OPT->{'--'}) { + push @{$OPT->{-argv}}, $arg; + } else { + die "bad argument: $arg\n"; + } + } else { + push @{$OPT->{-argv}}, $arg; + } + }; + ($spec, '<>' => $cb, $GLP_PASS) # for Getopt::Long +} + +sub rel2abs ($$) { + my ($self, $p) = @_; + return $p if index($p, '/') == 0; # already absolute + my $pwd = $self->{env}->{PWD}; + if (defined $pwd) { + my $cwd = $self->{3} // getcwd() // die "getcwd(PWD=$pwd): $!"; + if (my @st_pwd = stat($pwd)) { + my @st_cwd = stat($cwd) or die "stat($cwd): $!"; + "@st_pwd[1,0]" eq "@st_cwd[1,0]" or + $self->{env}->{PWD} = $pwd = $cwd; + } else { # PWD was invalid + delete $self->{env}->{PWD}; + undef $pwd; + } + } + $pwd //= $self->{env}->{PWD} = getcwd() // die "getcwd(PWD=$pwd): $!"; + File::Spec->rel2abs($p, $pwd); +} + sub _store_path ($) { - my ($env) = @_; - File::Spec->rel2abs(($env->{XDG_DATA_HOME} // - ($env->{HOME} // '/nonexistent').'/.local/share') - .'/lei/store', $env->{PWD}); + my ($self) = @_; + rel2abs($self, ($self->{env}->{XDG_DATA_HOME} // + ($self->{env}->{HOME} // '/nonexistent').'/.local/share') + .'/lei/store'); } sub _config_path ($) { - my ($env) = @_; - File::Spec->rel2abs(($env->{XDG_CONFIG_HOME} // - ($env->{HOME} // '/nonexistent').'/.config') - .'/lei/config', $env->{PWD}); + my ($self) = @_; + rel2abs($self, ($self->{env}->{XDG_CONFIG_HOME} // + ($self->{env}->{HOME} // '/nonexistent').'/.config') + .'/lei/config'); } # TODO: generate shell completion + help using %CMD and %OPTDESC # command => [ positional_args, 1-line description, Getopt::Long option spec ] our %CMD = ( # sorted in order of importance/use: -'query' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw( - save-as=s output|o=s format|f=s dedupe|d=s thread|t augment|a - limit|n=i sort|s=s@ reverse|r offset=i remote local! extinbox! - since|after=s until|before=s) ], +'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw( + save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a + sort|s=s reverse|r offset=i remote! local! external! pretty + mua-cmd|mua=s no-torsocks torsocks=s verbose|v quiet|q + received-after=s received-before=s sent-after=s sent-since=s), + PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ], 'show' => [ 'MID|OID', 'show a given object (Message-ID or object ID)', qw(type=s solve! format|f=s dedupe|d=s thread|t remote local!), pass_through('git show') ], -'add-extinbox' => [ 'URL_OR_PATHNAME', +'add-external' => [ 'URL_OR_PATHNAME', 'add/set priority of a publicinbox|extindex for extra matches', - qw(prio=i) ], -'ls-extinbox' => [ '[FILTER...]', 'list publicinbox|extindex locations', - qw(format|f=s z local remote) ], -'forget-extinbox' => [ '{URL_OR_PATHNAME|--prune}', + qw(boost=i quiet|q) ], +'ls-external' => [ '[FILTER...]', 'list publicinbox|extindex locations', + qw(format|f=s z|0 local remote quiet|q) ], +'forget-external' => [ 'URL_OR_PATHNAME...|--prune', 'exclude further results from a publicinbox|extindex', - qw(prune) ], + qw(prune quiet|q) ], 'ls-query' => [ '[FILTER...]', 'list saved search queries', qw(name-only format|f=s z) ], @@ -83,16 +133,16 @@ our %CMD = ( # sorted in order of importance/use: 'set/unset flags on message(s) from stdin', qw(stdin| oid=s exact by-mid|mid:s) ], 'forget' => [ '[--stdin|--oid=OID|--by-mid=MID]', - 'exclude message(s) on stdin from query results', + "exclude message(s) on stdin from `q' search results", qw(stdin| oid=s exact by-mid|mid:s quiet|q) ], -'purge-mailsource' => [ '{URL_OR_PATHNAME|--all}', +'purge-mailsource' => [ 'URL_OR_PATHNAME|--all', 'remove imported messages from IMAP, Maildirs, and MH', qw(exact! all jobs:i indexed) ], # code repos are used for `show' to solve blobs from patch mails 'add-coderepo' => [ 'PATHNAME', 'add or set priority of a git code repo', - qw(prio=i) ], + qw(boost=i) ], 'ls-coderepo' => [ '[FILTER_TERMS...]', 'list known code repos', qw(format|f=s z) ], 'forget-coderepo' => [ 'PATHNAME', @@ -109,9 +159,9 @@ our %CMD = ( # sorted in order of importance/use: 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch', qw(prune) ], -'import' => [ '{URL_OR_PATHNAME|--stdin}', +'import' => [ 'URL_OR_PATHNAME|--stdin', 'one-shot import/update from URL or filesystem', - qw(stdin| limit|n=i offset=i recursive|r exclude=s include=s !flags), + qw(stdin| offset=i recursive|r exclude=s include=s !flags), ], 'config' => [ '[...]', sub { @@ -121,10 +171,9 @@ our %CMD = ( # sorted in order of importance/use: 'init' => [ '[PATHNAME]', sub { 'initialize storage, default: '._store_path($_[0]); }, qw(quiet|q) ], -'daemon-stop' => [ '', 'stop the lei-daemon' ], +'daemon-kill' => [ '[-SIGNAL]', 'signal the lei-daemon', + opt_dash('signal|s=s', '[0-9]+|(?:[A-Z][A-Z0-9]+)') ], 'daemon-pid' => [ '', 'show the PID of the lei-daemon' ], -'daemon-env' => [ '[NAME=VALUE...]', 'set, unset, or show daemon environment', - qw(clear| unset|u=s@ z|0) ], 'help' => [ '[SUBCOMMAND]', 'show help' ], # XXX do we need this? @@ -132,7 +181,11 @@ our %CMD = ( # sorted in order of importance/use: 'reorder-local-store-and-break-history' => [ '[REFNAME]', 'rewrite git history in an attempt to improve compression', - 'gc!' ] + 'gc!' ], + +# internal commands are prefixed with '_' +'_complete' => [ '[...]', 'internal shell completion helper', + pass_through('everything') ], ); # @CMD # switch descriptions, try to keep consistent across commands @@ -147,38 +200,45 @@ my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ]; my %OPTDESC = ( 'help|h' => 'show this built-in help', 'quiet|q' => 'be quiet', +'verbose|v' => 'be more verbose', 'solve!' => 'do not attempt to reconstruct blobs from emails', +'torsocks=s' => ['auto|no|yes', + 'whether or not to wrap git and curl commands with torsocks'], +'no-torsocks' => 'alias for --torsocks=no', 'save-as=s' => ['NAME', 'save a search terms by given name'], 'type=s' => [ 'any|mid|git', 'disambiguate type' ], -'dedupe|d=s' => ['STRAT|content|oid|mid', +'dedupe|d=s' => ['STRAT|content|oid|mid|none', 'deduplication strategy'], 'show thread|t' => 'display entire thread a message belongs to', -'query thread|t' => +'q thread|t' => 'return all messages in the same thread as the actual match(es)', 'augment|a' => 'augment --output destination instead of clobbering', -'output|o=s' => [ 'DEST', +'output|mfolder|o=s' => [ 'DEST', "destination (e.g. `/path/to/Maildir', or `-' for stdout)" ], +'mua-cmd|mua=s' => [ 'COMMAND', + "MUA to run on --output Maildir or mbox (e.g. `mutt -f %f'" ], 'show format|f=s' => [ 'OUT|plain|raw|html|mboxrd|mboxcl2|mboxcl', 'message/object output format' ], 'mark format|f=s' => $stdin_formats, 'forget format|f=s' => $stdin_formats, -'query format|f=s' => [ 'OUT|maildir|mboxrd|mboxcl2|mboxcl|html|oid', +'q format|f=s' => [ + 'OUT|maildir|mboxrd|mboxcl2|mboxcl|mboxo|html|json|jsonl|concatjson', 'specify output format, default depends on --output'], 'ls-query format|f=s' => $ls_format, -'ls-extinbox format|f=s' => $ls_format, +'ls-external format|f=s' => $ls_format, -'limit|n=i' => ['NUM', - 'limit on number of matches (default: 10000)' ], +'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ], 'offset=i' => ['OFF', 'search result offset (default: 0)'], -'sort|s=s@' => [ 'VAL|internaldate,date,relevance,docid', +'sort|s=s' => [ 'VAL|received,relevance,docid', "order of results `--output'-dependent"], +'reverse|r' => [ 'reverse search results' ], # like sort(1) -'prio=i' => 'priority of query source', +'boost=i' => 'increase/decrease priority of results (default: 0)', 'local' => 'limit operations to the local filesystem', 'local!' => 'exclude results from the local filesystem', @@ -198,58 +258,175 @@ my %OPTDESC = ( 'by-mid|mid:s' => [ 'MID', 'match only by Message-ID, ignoring contents' ], 'jobs:i' => 'set parallelism level', -# xargs, env, use "-0", git(1) uses "-z". Should we support z|0 everywhere? -'z' => 'use NUL \\0 instead of newline (CR) to delimit lines', +# xargs, env, use "-0", git(1) uses "-z". We support z|0 everywhere 'z|0' => 'use NUL \\0 instead of newline (CR) to delimit lines', -# note: no "--ignore-environment" / "-i" support like env(1) since that -# is one-shot and this is for a persistent daemon: -'clear|' => 'clear the daemon environment', -'unset|u=s@' => ['NAME', - 'unset matching NAME, may be specified multiple times'], +'signal|s=s' => [ 'SIG', 'signal to send lei-daemon (default: TERM)' ], ); # %OPTDESC -sub x_it ($$) { # pronounced "exit" +my %CONFIG_KEYS = ( + 'leistore.dir' => 'top-level storage location', +); + +# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE +sub x_it ($$) { my ($self, $code) = @_; - if (my $sig = ($code & 127)) { - kill($sig, $self->{pid} // $$); - } else { - $code >>= 8; - if (my $sock = $self->{sock}) { - say $sock "exit=$code"; - } else { # for oneshot - $quit->($code); + # make sure client sees stdout before exit + $self->{1}->autoflush(1) if $self->{1}; + dump_and_clear_log(); + if (my $sock = $self->{sock}) { + send($sock, "x_it $code", MSG_EOR); + } elsif (my $signum = ($code & 127)) { # oneshot, usually SIGPIPE (13) + $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work + kill $signum, $$; + sleep; # wait for signal + } else { # oneshot + # don't want to end up using $? from child processes + for my $f (qw(lxs l2m)) { + my $wq = delete $self->{$f} or next; + $wq->DESTROY; } + $quit->($code >> 8); } } -sub emit { - my ($self, $channel) = @_; # $buf = $_[2] - print { $self->{$channel} } $_[2] or die "print FD[$channel]: $!"; +sub err ($;@) { + my $self = shift; + my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{GLOB}; + my $eor = (substr($_[-1], -1, 1) eq "\n" ? () : "\n"); + print $err @_, $eor and return; + my $old_err = delete $self->{2}; + close($old_err) if $! == EPIPE && $old_err;; + $err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB}; + print $err @_, $eor or print STDERR @_, $eor; } -sub err { - my ($self, $buf) = @_; - $buf .= "\n" unless $buf =~ /\n\z/s; - emit($self, 2, $buf); -} - -sub qerr { $_[0]->{opt}->{quiet} or err(@_) } +sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) } sub fail ($$;$) { my ($self, $buf, $exit_code) = @_; - err($self, $buf); + err($self, $buf) if defined $buf; + send($self->{pkt_op}, '!', MSG_EOR) if $self->{pkt_op}; # fail_handler x_it($self, ($exit_code // 1) << 8); undef; } +sub out ($;@) { + my $self = shift; + return if print { $self->{1} // return } @_; # likely + return note_sigpipe($self, 1) if $! == EPIPE; + my $err = "error writing to stdout: $!"; + delete $self->{1}; + fail($self, $err); +} + +sub puts ($;@) { out(shift, map { "$_\n" } @_) } + +sub child_error { # passes non-fatal curl exit codes to user + my ($self, $child_error) = @_; # child_error is $? + if (my $sock = $self->{sock}) { # send to lei(1) client + send($sock, "child_error $child_error", MSG_EOR); + } else { # oneshot + $self->{child_error} = $child_error; + } + undef; +} + +sub atfork_prepare_wq { + my ($self, $wq) = @_; + my $tcafc = $wq->{-ipc_atfork_child_close} //= [ $listener // () ]; + if (my $sock = $self->{sock}) { + push @$tcafc, @$self{qw(0 1 2 3)}, $sock; + } + if (my $pgr = $self->{pgr}) { + push @$tcafc, @$pgr[1,2]; + } + if (my $old_1 = $self->{old_1}) { + push @$tcafc, $old_1; + } + for my $f (qw(lxs l2m)) { + my $ipc = $self->{$f} or next; + push @$tcafc, grep { defined } + @$ipc{qw(-wq_s1 -wq_s2 -ipc_req -ipc_res)}; + } +} + +sub io_restore ($$) { + my ($dst, $src) = @_; + for my $i (0..2) { # standard FDs + my $io = delete $src->{$i} or next; + $dst->{$i} = $io; + } + for my $i (3..9) { # named (non-standard) FDs + my $io = $src->{$i} or next; + my @st = stat($io) or die "stat $src.$i ($io): $!"; + my $f = delete $dst->{"dev=$st[0],ino=$st[1]"} // next; + $dst->{$f} = $io; + delete $src->{$i}; + } +} + +sub note_sigpipe { # triggers sigpipe_handler + my ($self, $fd) = @_; + close(delete($self->{$fd})); # explicit close silences Perl warning + send($self->{pkt_op}, '|', MSG_EOR) if $self->{pkt_op}; + x_it($self, 13); +} + +sub atfork_child_wq { + my ($self, $wq) = @_; + io_restore($self, $wq); + -S $self->{pkt_op} or die 'BUG: {pkt_op} expected'; + io_restore($self->{l2m}, $wq); + %PATH2CFG = (); + undef $errors_log; + $quit = \&CORE::exit; + $current_lei = $self; # for SIG{__WARN__} +} + +sub io_extract ($;@) { + my ($obj, @fields) = @_; + my @io; + for my $f (@fields) { + my $io = delete $obj->{$f} or next; + my @st = stat($io) or die "W: stat $obj.$f ($io): $!"; + $obj->{"dev=$st[0],ino=$st[1]"} = $f; + push @io, $io; + } + @io +} + +# usage: ($lei, @io) = $lei->atfork_parent_wq($wq); +sub atfork_parent_wq { + my ($self, $wq) = @_; + my $env = delete $self->{env}; # env is inherited at fork + my $lei = bless { %$self }, ref($self); + for my $f (qw(dedupe ovv)) { + my $tmp = delete($lei->{$f}) or next; + $lei->{$f} = $wq->deep_clone($tmp); + } + $self->{env} = $env; + delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m + my @io = (delete(@$lei{qw(0 1 2)}), + io_extract($lei, qw(sock pkt_op startq))); + my $l2m = $lei->{l2m}; + if ($l2m && $l2m != $wq) { # $wq == lxs + if (my $wq_s1 = $l2m->{-wq_s1}) { + push @io, io_extract($l2m, '-wq_s1'); + $l2m->{-wq_s1} = $wq_s1; + } + $l2m->wq_close(1); + } + ($lei, @io); +} + sub _help ($;$) { my ($self, $errmsg) = @_; my $cmd = $self->{cmd} // 'COMMAND'; my @info = @{$CMD{$cmd} // [ '...', '...' ]}; my @top = ($cmd, shift(@info) // ()); my $cmd_desc = shift(@info); - $cmd_desc = $cmd_desc->($self->{env}) if ref($cmd_desc) eq 'CODE'; + $cmd_desc = $cmd_desc->($self) if ref($cmd_desc) eq 'CODE'; my @opt_desc; my $lpad = 2; for my $sw (grep { !ref } @info) { # ("prio=s", "z", $GLP_PASS) @@ -314,8 +491,7 @@ EOF $msg .= $rhs; $msg .= "\n"; } - my $channel = $errmsg ? 2 : 1; - emit($self, $channel, $msg); + print { $self->{$errmsg ? 2 : 1} } $msg; x_it($self, $errmsg ? 1 << 8 : 0); # stderr => failure undef; } @@ -323,23 +499,25 @@ EOF sub optparse ($$$) { my ($self, $cmd, $argv) = @_; $self->{cmd} = $cmd; - my $opt = $self->{opt} = {}; + $OPT = $self->{opt} = {}; my $info = $CMD{$cmd} // [ '[...]' ]; my ($proto, undef, @spec) = @$info; - my $glp = ref($spec[-1]) ? pop(@spec) : $GLP; # or $GLP_PASS + my $glp = ref($spec[-1]) eq ref($GLP) ? pop(@spec) : $GLP; push @spec, qw(help|h); my $lone_dash; if ($spec[0] =~ s/\|\z//s) { # "stdin|" or "clear|" allows "-" alias $lone_dash = $spec[0]; - $opt->{$spec[0]} = \(my $var); + $OPT->{$spec[0]} = \(my $var); push @spec, '' => \$var; } - $glp->getoptionsfromarray($argv, $opt, @spec) or + $glp->getoptionsfromarray($argv, $OPT, @spec) or return _help($self, "bad arguments or options for $cmd"); - return _help($self) if $opt->{help}; + return _help($self) if $OPT->{help}; + + push @$argv, @{$OPT->{-argv}} if defined($OPT->{-argv}); # "-" aliases "stdin" or "clear" - $opt->{$lone_dash} = ${$opt->{$lone_dash}} if defined $lone_dash; + $OPT->{$lone_dash} = ${$OPT->{$lone_dash}} if defined $lone_dash; my $i = 0; my $POS_ARG = '[A-Z][A-Z0-9_]+'; @@ -355,14 +533,14 @@ sub optparse ($$$) { } elsif ($var =~ /\.\.\.\]\z/) { # optional args start $inf = 1; last; - } elsif ($var =~ /\A\[$POS_ARG\]\z/) { # one optional arg + } elsif ($var =~ /\A\[-?$POS_ARG\]\z/) { # one optional arg $i++; } elsif ($var =~ /\A.+?\|/) { # required FOO|--stdin my @or = split(/\|/, $var); my $ok; for my $o (@or) { if ($o =~ /\A--([a-z0-9\-]+)/) { - $ok = defined($opt->{$1}); + $ok = defined($OPT->{$1}); last; } elsif (defined($argv->[$i])) { $ok = 1; @@ -370,6 +548,7 @@ sub optparse ($$$) { last; } # else continue looping } + last if $ok; my $last = pop @or; $err = join(', ', @or) . " or $last must be set"; } else { @@ -377,7 +556,6 @@ sub optparse ($$$) { } last if $err; } - # warn "inf=$inf ".scalar(@$argv). ' '.scalar(@args)."\n"; if (!$inf && scalar(@$argv) > scalar(@args)) { $err //= 'too many arguments'; } @@ -386,7 +564,8 @@ sub optparse ($$$) { sub dispatch { my ($self, $cmd, @argv) = @_; - local $SIG{__WARN__} = sub { err($self, "@_") }; + local $current_lei = $self; # for __WARN__ + dump_and_clear_log("from previous run\n"); return _help($self, 'no command given') unless defined($cmd); my $func = "lei_$cmd"; $func =~ tr/-/_/; @@ -405,7 +584,7 @@ sub dispatch { sub _lei_cfg ($;$) { my ($self, $creat) = @_; - my $f = _config_path($self->{env}); + my $f = _config_path($self); my @st = stat($f); my $cur_st = @st ? pack('dd', $st[10], $st[7]) : ''; # 10:ctime, 7:size if (my $cfg = $PATH2CFG{$f}) { # reuse existing object in common case @@ -434,8 +613,8 @@ sub _lei_store ($;$) { my $cfg = _lei_cfg($self, $creat); $cfg->{-lei_store} //= do { require PublicInbox::LeiStore; - PublicInbox::SearchIdx::load_xapian_writable(); - defined(my $dir = $cfg->{'leistore.dir'}) or return; + my $dir = $cfg->{'leistore.dir'}; + $dir //= $creat ? _store_path($self) : return; PublicInbox::LeiStore->new($dir, { creat => $creat }); }; } @@ -444,36 +623,37 @@ sub lei_show { my ($self, @argv) = @_; } -sub lei_query { - my ($self, @argv) = @_; -} - sub lei_mark { my ($self, @argv) = @_; } -sub lei_config { +sub _config { my ($self, @argv) = @_; - $self->{opt}->{'config-file'} and return fail $self, - "config file switches not supported by `lei config'"; my $env = $self->{env}; delete local $env->{GIT_CONFIG}; + delete local $ENV{GIT_CONFIG}; my $cfg = _lei_cfg($self, 1); my $cmd = [ qw(git config -f), $cfg->{'-f'}, @argv ]; my %rdr = map { $_ => $self->{$_} } (0..2); - require PublicInbox::Import; - PublicInbox::Import::run_die($cmd, $env, \%rdr); + waitpid(spawn($cmd, $env, \%rdr), 0); +} + +sub lei_config { + my ($self, @argv) = @_; + $self->{opt}->{'config-file'} and return fail $self, + "config file switches not supported by `lei config'"; + _config(@_); + x_it($self, $?) if $?; } sub lei_init { my ($self, $dir) = @_; my $cfg = _lei_cfg($self, 1); my $cur = $cfg->{'leistore.dir'}; - my $env = $self->{env}; - $dir //= _store_path($env); - $dir = File::Spec->rel2abs($dir, $env->{PWD}); # PWD is symlink-aware + $dir //= _store_path($self); + $dir = rel2abs($self, $dir); my @cur = stat($cur) if defined($cur); - $cur = File::Spec->canonpath($cur) if $cur; + $cur = File::Spec->canonpath($cur // $dir); my @dir = stat($dir); my $exists = "I: leistore.dir=$cur already initialized" if @dir; if (@cur) { @@ -483,7 +663,7 @@ sub lei_init { } # some folks like symlinks and bind mounts :P - if (@dir && "$cur[0] $cur[1]" eq "$dir[0] $dir[1]") { + if (@dir && "@cur[1,0]" eq "@dir[1,0]") { lei_config($self, 'leistore.dir', $dir); _lei_store($self, 1)->done; return qerr($self, "$exists (as $cur)"); @@ -498,130 +678,307 @@ E: leistore.dir=$cur already initialized and it is not $dir return qerr($self, $exists); } -sub lei_daemon_pid { emit($_[0], 1, "$$\n") } +sub lei_daemon_pid { puts shift, $$ } -sub lei_daemon_stop { $quit->(0) } +sub lei_daemon_kill { + my ($self) = @_; + my $sig = $self->{opt}->{signal} // 'TERM'; + kill($sig, $$) or fail($self, "kill($sig, $$): $!"); +} -sub lei_daemon_env { - my ($self, @argv) = @_; - my $opt = $self->{opt}; - if (defined $opt->{clear}) { - %ENV = (); - } elsif (my $u = $opt->{unset}) { - delete @ENV{@$u}; +sub lei_help { _help($_[0]) } + +# Shell completion helper. Used by lei-completion.bash and hopefully +# other shells. Try to do as much here as possible to avoid redundancy +# and improve maintainability. +sub lei__complete { + my ($self, @argv) = @_; # argv = qw(lei and any other args...) + shift @argv; # ignore "lei", the entire command is sent + @argv or return puts $self, grep(!/^_/, keys %CMD), qw(--help -h); + my $cmd = shift @argv; + my $info = $CMD{$cmd} // do { # filter matching commands + @argv or puts $self, grep(/\A\Q$cmd\E/, keys %CMD); + return; + }; + my ($proto, undef, @spec) = @$info; + my $cur = pop @argv; + my $re = defined($cur) ? qr/\A\Q$cur\E/ : qr/./; + if (substr($cur // '-', 0, 1) eq '-') { # --switches + # gross special case since the only git-config options + # Consider moving to a table if we need more special cases + # we use Getopt::Long for are the ones we reject, so these + # are the ones we don't reject: + if ($cmd eq 'config') { + puts $self, grep(/$re/, keys %CONFIG_KEYS); + @spec = qw(add z|null get get-all unset unset-all + replace-all get-urlmatch + remove-section rename-section + name-only list|l edit|e + get-color-name get-colorbool); + # fall-through + } + puts $self, grep(/$re/, map { # generate short/long names + if (s/[:=].+\z//) { # req/optional args, e.g output|o=i + } else { # negation: solve! => no-solve|solve + s/\A(.+)!\z/no-$1|$1/; + } + map { + my $x = length > 1 ? "--$_" : "-$_"; + $x eq $cur ? () : $x; + } split(/\|/, $_, -1) # help|h + } grep { $OPTDESC{"$cmd\t$_"} || $OPTDESC{$_} } @spec); + } elsif ($cmd eq 'config' && !@argv && !$CONFIG_KEYS{$cur}) { + puts $self, grep(/$re/, keys %CONFIG_KEYS); + } + + # switch args (e.g. lei q -f mbox) + if (($argv[-1] // $cur // '') =~ /\A--?([\w\-]+)\z/) { + my $opt = quotemeta $1; + puts $self, map { + my $v = $OPTDESC{$_}; + $v = $v->[0] if ref($v); + my @v = split(/\|/, $v); + # get rid of ALL CAPS placeholder (e.g "OUT") + # (TODO: completion for external paths) + shift(@v) if uc($v[0]) eq $v[0]; + @v; + } grep(/\A(?:$cmd\t|)(?:[\w-]+\|)*$opt\b/, keys %OPTDESC); } - if (@argv) { - %ENV = (%ENV, map { split(/=/, $_, 2) } @argv); - } elsif (!defined($opt->{clear}) && !$opt->{unset}) { - my $eor = $opt->{z} ? "\0" : "\n"; - my $buf = ''; - while (my ($k, $v) = each %ENV) { $buf .= "$k=$v$eor" } - emit($self, 1, $buf) + $cmd =~ tr/-/_/; + if (my $sub = $self->can("_complete_$cmd")) { + puts $self, $sub->($self, @argv, $cur); } + # TODO: URLs, pathnames, OIDs, MIDs, etc... See optparse() for + # proto parsing. } -sub lei_help { _help($_[0]) } +sub exec_buf ($$) { + my ($argv, $env) = @_; + my $argc = scalar @$argv; + my $buf = 'exec '.join("\0", scalar(@$argv), @$argv); + while (my ($k, $v) = each %$env) { $buf .= "\0$k=$v" }; + $buf; +} -sub reap_exec { # dwaitpid callback - my ($self, $pid) = @_; - x_it($self, $?); +sub start_mua { + my ($self) = @_; + my $mua = $self->{opt}->{'mua-cmd'} // return; + my $mfolder = $self->{ovv}->{dst}; + my (@cmd, $replaced); + if ($mua =~ /\A(?:mutt|mailx|mail|neomutt)\z/) { + @cmd = ($mua, '-f'); + # TODO: help wanted: other common FOSS MUAs + } else { + require Text::ParseWords; + my @cmd = Text::ParseWords::shellwords($mua); + # mutt uses '%f' for open-hook with compressed mbox, we follow + @cmd = map { $_ eq '%f' ? ($replaced = $mfolder) : $_ } @cmd; + } + push @cmd, $mfolder unless defined($replaced); + if (my $sock = $self->{sock}) { # lei(1) client process runs it + send($sock, exec_buf(\@cmd, {}), MSG_EOR); + } else { # oneshot + $self->{"mua.pid.$self.$$"} = spawn(\@cmd); + } } -sub lei_git { # support passing through random git commands - my ($self, @argv) = @_; - my %rdr = map { $_ => $self->{$_} } (0..2); - my $pid = spawn(['git', @argv], $self->{env}, \%rdr); - PublicInbox::DS::dwaitpid($pid, \&reap_exec, $self); +# caller needs to "-t $self->{1}" to check if tty +sub start_pager { + my ($self) = @_; + my $env = $self->{env}; + my $fh = popen_rd([qw(git var GIT_PAGER)], $env); + chomp(my $pager = <$fh> // ''); + close($fh) or warn "`git var PAGER' error: \$?=$?"; + return if $pager eq 'cat' || $pager eq ''; + # TODO TIOCGWINSZ + my $new_env = { LESS => 'FRX', LV => '-c', COLUMNS => 80 }; + $new_env->{MORE} = 'FRX' if $^O eq 'freebsd'; + pipe(my ($r, $wpager)) or return warn "pipe: $!"; + my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; + my $pgr = [ undef, @$rdr{1, 2}, $$ ]; + if (my $sock = $self->{sock}) { # lei(1) process runs it + delete @$new_env{keys %$env}; # only set iff unset + my $fds = [ map { fileno($_) } @$rdr{0..2} ]; + $send_cmd->($sock, $fds, exec_buf([$pager], $new_env), MSG_EOR); + } else { + $pgr->[0] = spawn([$pager], $new_env, $rdr); + } + $self->{1} = $wpager; + $self->{2} = $wpager if -t $self->{2}; + $env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git + $self->{pgr} = $pgr; +} + +sub stop_pager { + my ($self) = @_; + my $pgr = delete($self->{pgr}) or return; + $self->{2} = $pgr->[2]; + # do not restore original stdout, just close it so we error out + close(delete($self->{1})) if $self->{1}; + my $pid = $pgr->[0]; + dwaitpid($pid, undef, $self->{sock}) if $pid && $pgr->[3] == $$; } sub accept_dispatch { # Listener {post_accept} callback my ($sock) = @_; # ignore other - $sock->blocking(1); $sock->autoflush(1); my $self = bless { sock => $sock }, __PACKAGE__; - vec(my $rin = '', fileno($sock), 1) = 1; - # `say $sock' triggers "die" in lei(1) - for my $i (0..2) { - if (select(my $rout = $rin, undef, undef, 1)) { - my $fd = IO::FDPass::recv(fileno($sock)); - if ($fd >= 0) { - my $rdr = ($fd == 0 ? '<&=' : '>&='); - if (open(my $fh, $rdr, $fd)) { - $self->{$i} = $fh; - } else { - say $sock "open($rdr$fd) (FD=$i): $!"; - return; - } - } else { - say $sock "recv FD=$i: $!"; - return; - } - } else { - say $sock "timed out waiting to recv FD=$i"; - return; + vec(my $rvec = '', fileno($sock), 1) = 1; + select($rvec, undef, undef, 60) or + return send($sock, 'timed out waiting to recv FDs', MSG_EOR); + my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN + if (scalar(@fds) == 4) { + for my $i (0..3) { + my $fd = shift(@fds); + open($self->{$i}, '+<&=', $fd) and next; + send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR); } + } else { + my $msg = "recv_cmd failed: $!"; + warn $msg; + return send($sock, $msg, MSG_EOR); } - # $ARGV_STR = join("]\0[", @ARGV); - # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV); - # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0"; - my ($client_pid, $argv, $env) = do { - local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2 - chomp(my $line = <$sock>); - split(/\0\0>/, $line, 3); - }; - my %env = map { split(/=/, $_, 2) } split(/\0/, $env); - if (chdir($env{PWD})) { + $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY + # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV); + # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0"; + substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z// + return send($sock, 'request command truncated', MSG_EOR); + my ($argc, @argv) = split(/\0/, $buf, -1); + undef $buf; + my %env = map { split(/=/, $_, 2) } splice(@argv, $argc); + if (chdir($self->{3})) { + local %ENV = %env; $self->{env} = \%env; - $self->{pid} = $client_pid; - eval { dispatch($self, split(/\]\0\[/, $argv)) }; - say $sock $@ if $@; + eval { dispatch($self, @argv) }; + send($sock, $@, MSG_EOR) if $@; } else { - say $sock "chdir($env{PWD}): $!"; # implicit close + send($sock, "fchdir: $!", MSG_EOR); # implicit close + } +} + +sub dclose { + my ($self) = @_; + for my $f (qw(lxs l2m)) { + my $wq = delete $self->{$f} or next; + if ($wq->wq_kill) { + $wq->wq_close + } elsif ($wq->wq_kill_old) { + $wq->wq_wait_old($self); + } + } + close(delete $self->{1}) if $self->{1}; # may reap_compress + $self->close if $self->{sock}; # PublicInbox::DS::close +} + +# for long-running results +sub event_step { + my ($self) = @_; + local %ENV = %{$self->{env}}; + my $sock = $self->{sock}; + local $current_lei = $self; + eval { + while (my @fds = $recv_cmd->($sock, my $buf, 4096)) { + if (scalar(@fds) == 1 && !defined($fds[0])) { + return if $! == EAGAIN; + next if $! == EINTR; + last if $! == ECONNRESET; + die "recvmsg: $!"; + } + for my $fd (@fds) { + open my $rfh, '+<&=', $fd; + } + die "unrecognized client signal: $buf"; + } + dclose($self); + }; + if (my $err = $@) { + eval { $self->fail($err) }; + dclose($self); + } +} + +sub event_step_init { + my ($self) = @_; + if (my $sock = $self->{sock}) { # using DS->EventLoop + $self->SUPER::new($sock, EPOLLIN|EPOLLET); } } sub noop {} +our $oldset; sub oldset { $oldset } + +sub dump_and_clear_log { + if (defined($errors_log) && -s STDIN && seek(STDIN, 0, SEEK_SET)) { + my @pfx = @_; + unshift(@pfx, "$errors_log ") if @pfx; + warn @pfx, do { local $/; }; + truncate(STDIN, 0) or warn "ftruncate ($errors_log): $!"; + } +} + # lei(1) calls this when it can't connect sub lazy_start { - my ($path, $errno) = @_; + my ($path, $errno, $narg) = @_; if ($errno == ECONNREFUSED) { unlink($path) or die "unlink($path): $!"; } elsif ($errno != ENOENT) { $! = $errno; # allow interpolation to stringify in die die "connect($path): $!"; } + if (eval { require BSD::Resource }) { + my $NOFILE = BSD::Resource::RLIMIT_NOFILE(); + my ($s, $h) = BSD::Resource::getrlimit($NOFILE); + BSD::Resource::setrlimit($NOFILE, $h, $h) if $s < $h; + } umask(077) // die("umask(077): $!"); - socket(my $l, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!"; - bind($l, pack_sockaddr_un($path)) or die "bind($path): $!"; - listen($l, 1024) or die "listen: $!"; + local $listener; + socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!"; + bind($listener, pack_sockaddr_un($path)) or die "bind($path): $!"; + listen($listener, 1024) or die "listen: $!"; my @st = stat($path) or die "stat($path): $!"; my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino - pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; - my $oldset = PublicInbox::Sigfd::block_signals(); - require IO::FDPass; + local $oldset = PublicInbox::DS::block_signals(); + if ($narg == 5) { + $send_cmd = PublicInbox::Spawn->can('send_cmd4'); + $recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do { + require PublicInbox::CmdIPC4; + $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4'); + PublicInbox::CmdIPC4->can('recv_cmd4'); + }; + } + $recv_cmd or die <<""; +(Socket::MsgHdr || Inline::C) missing/unconfigured (narg=$narg); + require PublicInbox::Listener; require PublicInbox::EOFpipe; - (-p STDOUT && -p STDERR) or die "E: stdout+stderr must be pipes\n"; - open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!"; + (-p STDOUT) or die "E: stdout must be a pipe\n"; + local $errors_log; + ($errors_log) = ($path =~ m!\A(.+?/)[^/]+\z!); + $errors_log .= 'errors.log'; + open(STDIN, '+>>', $errors_log) or die "open($errors_log): $!"; + STDIN->autoflush(1); + dump_and_clear_log("from previous daemon process:\n"); POSIX::setsid() > 0 or die "setsid: $!"; my $pid = fork // die "fork: $!"; return if $pid; $0 = "lei-daemon $path"; local %PATH2CFG; - $_->blocking(0) for ($l, $eof_r, $eof_w); - $l = PublicInbox::Listener->new($l, \&accept_dispatch, $l); + $listener->blocking(0); my $exit_code; - local $quit = sub { - $exit_code //= shift; - my $listener = $l or exit($exit_code); - unlink($path) if defined($path); - # closing eof_w triggers \&noop wakeup - $eof_w = $l = $path = undef; - $listener->close; # DS::close - PublicInbox::DS->SetLoopTimeout(1000); + my $pil = PublicInbox::Listener->new($listener, \&accept_dispatch); + local $quit = do { + pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; + PublicInbox::EOFpipe->new($eof_r, \&noop, undef); + sub { + $exit_code //= shift; + my $lis = $pil or exit($exit_code); + # closing eof_w triggers \&noop wakeup + $listener = $eof_w = $pil = $path = undef; + $lis->close; # DS::close + PublicInbox::DS->SetLoopTimeout(1000); + }; }; - PublicInbox::EOFpipe->new($eof_r, \&noop, undef); my $sig = { CHLD => \&PublicInbox::DS::enqueue_reap, QUIT => $quit, @@ -631,14 +988,17 @@ sub lazy_start { USR1 => \&noop, USR2 => \&noop, }; - my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK); - local %SIG = (%SIG, %$sig) if !$sigfd; + my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK); + local @SIG{keys %$sig} = values(%$sig) unless $sigfd; + undef $sig; + local $SIG{PIPE} = 'IGNORE'; if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets + undef $sigfd; PublicInbox::DS->SetLoopTimeout(5000); } else { # wake up every second to accept signals if we don't # have signalfd or IO::KQueue: - PublicInbox::Sigfd::sig_setmask($oldset); + PublicInbox::DS::sig_setmask($oldset); PublicInbox::DS->SetLoopTimeout(1000); } PublicInbox::DS->SetPostLoopCallback(sub { @@ -648,9 +1008,9 @@ sub lazy_start { warn "$path dev/ino changed, quitting\n"; $path = undef; } - } elsif (defined($path)) { - warn "stat($path): $!, quitting ...\n"; - undef $path; # don't unlink + } elsif (defined($path)) { # ENOENT is common + warn "stat($path): $!, quitting ...\n" if $! != ENOENT; + undef $path; $quit->(); } return 1 if defined($path); @@ -667,36 +1027,46 @@ sub lazy_start { $n; # true: continue, false: stop }); - # STDIN was redirected to /dev/null above, closing STDOUT and - # STDERR will cause the calling `lei' client process to finish - # reading <$daemon> pipe. - open STDOUT, '>&STDIN' or die "redirect stdout failed: $!"; - openlog($path, 'pid', 'user'); - local $SIG{__WARN__} = sub { syslog('warning', "@_") }; - my $owner_pid = $$; - my $on_destroy = PublicInbox::OnDestroy->new(sub { - syslog('crit', "$@") if $@ && $$ == $owner_pid; - }); + # STDIN was redirected to /dev/null above, closing STDERR and + # STDOUT will cause the calling `lei' client process to finish + # reading the <$daemon> pipe. + local $SIG{__WARN__} = sub { + $current_lei ? err($current_lei, @_) : warn( + strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(time))," $$ ", @_); + }; open STDERR, '>&STDIN' or die "redirect stderr failed: $!"; + open STDOUT, '>&STDIN' or die "redirect stdout failed: $!"; # $daemon pipe to `lei' closed, main loop begins: PublicInbox::DS->EventLoop; - @$on_destroy = (); # cancel on_destroy if we get here exit($exit_code // 0); } -# for users w/o IO::FDPass +# for users w/o Socket::Msghdr installed or Inline::C enabled sub oneshot { my ($main_pkg) = @_; my $exit = $main_pkg->can('exit'); # caller may override exit() local $quit = $exit if $exit; local %PATH2CFG; umask(077) // die("umask(077): $!"); - dispatch((bless { - 0 => *STDIN{IO}, - 1 => *STDOUT{IO}, - 2 => *STDERR{IO}, + my $self = bless { + 0 => *STDIN{GLOB}, + 1 => *STDOUT{GLOB}, + 2 => *STDERR{GLOB}, env => \%ENV - }, __PACKAGE__), @ARGV); + }, __PACKAGE__; + dispatch($self, @ARGV); + x_it($self, $self->{child_error}) if $self->{child_error}; +} + +# ensures stdout hits the FS before sock disconnects so a client +# can immediately reread it +sub DESTROY { + my ($self) = @_; + $self->{1}->autoflush(1) if $self->{1}; + stop_pager($self); + if (my $mua_pid = delete $self->{"mua.pid.$self.$$"}) { + waitpid($mua_pid, 0); + } } 1;