X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FLEI.pm;h=8adf70faf2d70dbcf629e103bdf9fd589b2673da;hb=961690bae47c90a4a6960952587c6f4463fb4b19;hp=ddc2736192d877e7488e5ef15d46d2e44fcdfd94;hpb=9dacb984fc80c9cf9be17ca13aba97033fb1d485;p=public-inbox.git diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index ddc27361..a7ddc21f 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # Backend for `lei' (local email interface). Unlike the C10K-oriented @@ -12,29 +12,33 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal PublicInbox::LeiQuery); use Getopt::Long (); use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); -use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET); +use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET); use Cwd qw(getcwd); use POSIX qw(strftime); use IO::Handle (); use Fcntl qw(SEEK_SET); use PublicInbox::Config; -use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET); -use PublicInbox::Sigfd; -use PublicInbox::DS qw(now dwaitpid); +use PublicInbox::Syscall qw(EPOLLIN); +use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::Lock; +use PublicInbox::Eml; +use PublicInbox::Import; +use PublicInbox::ContentHash qw(git_sha); use Time::HiRes qw(stat); # ctime comparisons for config cache use File::Path qw(mkpath); use File::Spec; +use Sys::Syslog qw(openlog syslog closelog); our $quit = \&CORE::exit; -our ($current_lei, $errors_log, $listener, $oldset); -my ($recv_cmd, $send_cmd); +our ($current_lei, $errors_log, $listener, $oldset, $dir_idle, + $recv_cmd, $send_cmd); my $GLP = Getopt::Long::Parser->new; $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev)); my $GLP_PASS = Getopt::Long::Parser->new; $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through)); our %PATH2CFG; # persistent for socket daemon +our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] } # TBD: this is a documentation mechanism to show a subcommand # (may) pass options through to another command: @@ -64,32 +68,46 @@ sub opt_dash ($$) { ($spec, '<>' => $cb, $GLP_PASS) # for Getopt::Long } -sub rel2abs ($$) { +# rel2abs preserves symlinks in parent, unlike abs_path +sub rel2abs { my ($self, $p) = @_; - return $p if index($p, '/') == 0; # already absolute + if (index($p, '/') == 0) { # already absolute + $p =~ tr!/!/!s; # squeeze redundant slashes + chop($p) if substr($p, -1, 1) eq '/'; + return $p; + } my $pwd = $self->{env}->{PWD}; if (defined $pwd) { - my $cwd = $self->{3} // getcwd() // die "getcwd(PWD=$pwd): $!"; if (my @st_pwd = stat($pwd)) { - my @st_cwd = stat($cwd) or die "stat($cwd): $!"; + my @st_cwd = stat($self->{3}) or die "stat({3}): $!"; "@st_pwd[1,0]" eq "@st_cwd[1,0]" or - $self->{env}->{PWD} = $pwd = $cwd; + $self->{env}->{PWD} = $pwd = undef; } else { # PWD was invalid - delete $self->{env}->{PWD}; - undef $pwd; + $self->{env}->{PWD} = $pwd = undef; } } - $pwd //= $self->{env}->{PWD} = getcwd() // die "getcwd(PWD=$pwd): $!"; + $pwd //= $self->{env}->{PWD} = getcwd() // die "getcwd: $!"; File::Spec->rel2abs($p, $pwd); } -sub _store_path ($) { +# abs_path resolves symlinks in parent iff all parents exist +sub abs_path { Cwd::abs_path($_[1]) // rel2abs(@_) } + +sub canonpath_harder { + my $p = $_[-1]; # $_[0] may be self + $p = File::Spec->canonpath($p); + $p =~ m!(?:/*|\A)\.\.(?:/*|\z)! && -e $p ? Cwd::abs_path($p) : $p; +} + +sub share_path ($) { # $HOME/.local/share/lei/$FOO my ($self) = @_; rel2abs($self, ($self->{env}->{XDG_DATA_HOME} // ($self->{env}->{HOME} // '/nonexistent').'/.local/share') - .'/lei/store'); + .'/lei'); } +sub store_path ($) { share_path($_[0]) . '/store' } + sub _config_path ($) { my ($self) = @_; rel2abs($self, ($self->{env}->{XDG_CONFIG_HOME} // @@ -97,112 +115,191 @@ sub _config_path ($) { .'/lei/config'); } +sub cache_dir ($) { + my ($self) = @_; + rel2abs($self, ($self->{env}->{XDG_CACHE_HOME} // + ($self->{env}->{HOME} // '/nonexistent').'/.cache') + .'/lei'); +} + +sub url_folder_cache { + my ($self) = @_; + require PublicInbox::SharedKV; # URI => updated_at_sec_ + PublicInbox::SharedKV->new(cache_dir($self).'/uri_folder'); +} + +sub ale { + my ($self) = @_; + $self->{ale} // do { + require PublicInbox::LeiALE; + my $cfg = $self->_lei_cfg(1); + $self->{ale} = $cfg->{ale} //= PublicInbox::LeiALE->new($self); + }; +} + sub index_opt { # TODO: drop underscore variants everywhere, they're undocumented qw(fsync|sync! jobs|j=i indexlevel|L=s compact - max_size|max-size=s sequential_shard|sequential-shard + max_size|max-size=s sequential-shard batch_size|batch-size=s skip-docdata) } +my @c_opt = qw(c=s@ C=s@ quiet|q); +my @net_opt = (qw(no-torsocks torsocks=s), PublicInbox::LeiQuery::curl_opt()); +my @lxs_opt = qw(remote! local! external! include|I=s@ exclude=s@ only|O=s@ + import-remote!); + +# we don't support -C as an alias for --find-copies since it's already +# used for chdir +our @diff_opt = qw(unified|U=i output-indicator-new=s output-indicator-old=s + output-indicator-context=s indent-heuristic! + minimal patience histogram anchored=s@ diff-algorithm=s + color-moved:s color-moved-ws=s no-color-moved no-color-moved-ws + word-diff:s word-diff-regex=s color-words:s no-renames + rename-empty! check ws-error-highlight=s full-index binary + abbrev:i break-rewrites|B:s find-renames|M:s find-copies:s + find-copies-harder irreversible-delete|D l=i diff-filter=s + S=s G=s find-object=s pickaxe-all pickaxe-regex O=s R + relative:s text|a ignore-cr-at-eol ignore-space-at-eol + ignore-space-change|b ignore-all-space|w ignore-blank-lines + inter-hunk-context=i function-context|W exit-code ext-diff + no-ext-diff textconv! src-prefix=s dst-prefix=s no-prefix + line-prefix=s); + # we generate shell completion + help using %CMD and %OPTDESC, # see lei__complete() and PublicInbox::LeiHelp # command => [ positional_args, 1-line description, Getopt::Long option spec ] our %CMD = ( # sorted in order of importance/use: 'q' => [ '--stdin|SEARCH_TERMS...', 'search for messages matching terms', 'stdin|', # /|\z/ must be first for lone dash - qw(save-as=s output|mfolder|o=s format|f=s dedupe|d=s threads|t+ - sort|s=s reverse|r offset=i remote! local! external! pretty - include|I=s@ exclude=s@ only=s@ jobs|j=s globoff|g augment|a - import-remote! import-before! lock=s@ - alert=s@ mua=s no-torsocks torsocks=s verbose|v+ quiet|q C=s@), - PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ], + @lxs_opt, @net_opt, + qw(save! output|mfolder|o=s format|f=s dedupe|d=s threads|t+ + sort|s=s reverse|r offset=i pretty jobs|j=s globoff|g augment|a + import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+ + shared color! mail-sync!), @c_opt, opt_dash('limit|n=i', '[0-9]+') ], -'show' => [ 'MID|OID', 'show a given object (Message-ID or object ID)', - qw(type=s solve! format|f=s dedupe|d=s threads|t remote local! C=s@), - pass_through('git show') ], +'up' => [ 'OUTPUT...|--all', 'update saved search', + qw(jobs|j=s lock=s@ alert=s@ mua=s verbose|v+ exclude=s@ + remote-fudge-time=s all:s remote! local! external!), @net_opt, @c_opt ], + +'lcat' => [ '--stdin|MSGID_OR_URL...', 'display local copy of message(s)', + 'stdin|', # /|\z/ must be first for lone dash + # some of these options are ridiculous for lcat + @lxs_opt, @net_opt, + qw(output|mfolder|o=s format|f=s dedupe|d=s threads|t+ + sort|s=s reverse|r offset=i jobs|j=s globoff|g augment|a + import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+ + color!), @c_opt, opt_dash('limit|n=i', '[0-9]+') ], + +'blob' => [ 'OID', 'show a git blob, reconstructing from mail if necessary', + qw(git-dir=s@ cwd! verbose|v+ mail! oid-a|A=s path-a|a=s path-b|b=s), + @lxs_opt, @net_opt, @c_opt ], + +'rediff' => [ '--stdin|LOCATION...', + 'regenerate a diff with different options', + 'stdin|', # /|\z/ must be first for lone dash + qw(git-dir=s@ cwd! verbose|v+ color:s no-color drq:1 dequote-only:1), + @diff_opt, @lxs_opt, @net_opt, @c_opt ], + +'mail-diff' => [ '--stdin|LOCATION...', 'diff the contents of emails', + 'stdin|', # /|\z/ must be first for lone dash + qw(verbose|v+ in-format|F=s color:s no-color raw-header), + @diff_opt, @net_opt, @c_opt ], 'add-external' => [ 'LOCATION', 'add/set priority of a publicinbox|extindex for extra matches', - qw(boost=i c=s@ mirror=s no-torsocks torsocks=s inbox-version=i), - qw(quiet|q verbose|v+ C=s@), - index_opt(), PublicInbox::LeiQuery::curl_opt() ], + qw(boost=i mirror=s inbox-version=i epoch=s verbose|v+), + @c_opt, index_opt(), @net_opt ], 'ls-external' => [ '[FILTER]', 'list publicinbox|extindex locations', - qw(format|f=s z|0 globoff|g invert-match|v local remote C=s@) ], + qw(format|f=s z|0 globoff|g invert-match|v local remote), @c_opt ], +'ls-label' => [ '', 'list labels', qw(z|0 stats:s), @c_opt ], +'ls-mail-sync' => [ '[FILTER]', 'list mail sync folders', + qw(z|0 globoff|g invert-match|v local remote), @c_opt ], +'ls-mail-source' => [ 'URL', 'list IMAP or NNTP mail source folders', + qw(z|0 ascii l pretty url), @net_opt, @c_opt ], 'forget-external' => [ 'LOCATION...|--prune', 'exclude further results from a publicinbox|extindex', - qw(prune quiet|q C=s@) ], - -'ls-query' => [ '[FILTER...]', 'list saved search queries', - qw(name-only format|f=s z C=s@) ], -'rm-query' => [ 'QUERY_NAME', 'remove a saved search', qw(C=s@) ], -'mv-query' => [ qw(OLD_NAME NEW_NAME), 'rename a saved search', qw(C=s@) ], - + qw(prune), @c_opt ], + +'ls-search' => [ '[PREFIX]', 'list saved search queries', + qw(format|f=s pretty l ascii z|0), @c_opt ], +'forget-search' => [ 'OUTPUT...|--prune', 'forget a saved search', + qw(verbose|v+ prune:s), @c_opt ], +'edit-search' => [ 'OUTPUT', "edit saved search via `git config --edit'", + @c_opt ], +'rm' => [ '--stdin|LOCATION...', + 'remove a message from the index and prevent reindexing', + 'stdin|', # /|\z/ must be first for lone dash + qw(in-format|F=s lock=s@), @net_opt, @c_opt ], 'plonk' => [ '--threads|--from=IDENT', 'exclude mail matching From: or threads from non-Message-ID searches', - qw(stdin| threads|t from|f=s mid=s oid=s C=s@) ], -'mark' => [ 'MESSAGE_FLAGS...', - 'set/unset keywords on message(s) from stdin', - qw(stdin| oid=s exact by-mid|mid:s C=s@) ], -'forget' => [ '[--stdin|--oid=OID|--by-mid=MID]', - "exclude message(s) on stdin from `q' search results", - qw(stdin| oid=s exact by-mid|mid:s quiet|q C=s@) ], + qw(stdin| threads|t from|f=s mid=s oid=s), @c_opt ], +'tag' => [ 'KEYWORDS...', + 'set/unset keywords and/or labels on message(s)', + qw(stdin| in-format|F=s input|i=s@ oid=s@ mid=s@), + @net_opt, @c_opt, pass_through('-kw:foo for delete') ], 'purge-mailsource' => [ 'LOCATION|--all', 'remove imported messages from IMAP, Maildirs, and MH', - qw(exact! all jobs:i indexed C=s@) ], - -# code repos are used for `show' to solve blobs from patch mails -'add-coderepo' => [ 'DIRNAME', 'add or set priority of a git code repo', - qw(boost=i C=s@) ], -'ls-coderepo' => [ '[FILTER_TERMS...]', - 'list known code repos', qw(format|f=s z C=s@) ], -'forget-coderepo' => [ 'DIRNAME', - 'stop using repo to solve blobs from patches', - qw(prune C=s@) ], - -'add-watch' => [ 'LOCATION', 'watch for new messages and flag changes', - qw(import! kw|keywords|flags! interval=s recursive|r - exclude=s include=s C=s@) ], + qw(exact! all jobs:i indexed), @c_opt ], + +'add-watch' => [ 'LOCATION...', 'watch for new messages and flag changes', + qw(poll-interval=s state=s recursive|r), @c_opt ], +'rm-watch' => [ 'LOCATION...', 'remove specified watch(es)', + qw(recursive|r), @c_opt ], 'ls-watch' => [ '[FILTER...]', 'list active watches with numbers and status', - qw(format|f=s z C=s@) ], -'pause-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote C=s@) ], -'resume-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote C=s@) ], + qw(l z|0), @c_opt ], +'pause-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote), @c_opt ], +'resume-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote), @c_opt ], 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch', - qw(prune C=s@) ], + qw(prune), @c_opt ], +'index' => [ 'LOCATION...', 'one-time index from URL or filesystem', + qw(in-format|F=s kw! offset=i recursive|r exclude=s include|I=s + verbose|v+ incremental!), @net_opt, # mainly for --proxy= + @c_opt ], 'import' => [ 'LOCATION...|--stdin', 'one-time import/update from URL or filesystem', - qw(stdin| offset=i recursive|r exclude=s include|I=s - lock=s@ in-format|F=s kw|keywords|flags! C=s@), - ], + qw(stdin| offset=i recursive|r exclude=s include|I=s new-only + lock=s@ in-format|F=s kw! verbose|v+ incremental! mail-sync!), + @net_opt, @c_opt ], +'forget-mail-sync' => [ 'LOCATION...', + 'forget sync information for a mail folder', @c_opt ], +'refresh-mail-sync' => [ 'LOCATION...|--all', + 'prune dangling sync data for a mail folder', 'all:s', + @net_opt, @c_opt ], +'export-kw' => [ 'LOCATION...|--all', + 'one-time export of keywords of sync sources', + qw(all:s mode=s), @net_opt, @c_opt ], 'convert' => [ 'LOCATION...|--stdin', 'one-time conversion from URL or filesystem to another format', - qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s quiet|q - lock=s@ kw|keywords|flags! C=s@), - ], -'p2q' => [ 'FILE|COMMIT_OID|--stdin', + qw(stdin| in-format|F=s out-format|f=s output|mfolder|o=s lock=s@ kw!), + @net_opt, @c_opt ], +'p2q' => [ 'LOCATION_OR_COMMIT...|--stdin', "use a patch to generate a query for `lei q --stdin'", - qw(stdin| want|w=s@ uri debug) ], + qw(stdin| in-format|F=s want|w=s@ uri debug), @net_opt, @c_opt ], 'config' => [ '[...]', sub { 'git-config(1) wrapper for '._config_path($_[0]); }, qw(config-file|system|global|file|f=s), # for conflict detection - qw(C=s@), pass_through('git config') ], + qw(edit|e c=s@ C=s@), pass_through('git config') ], +'inspect' => [ 'ITEMS...|--stdin', 'inspect lei/store and/or local external', + qw(stdin| pretty ascii dir|d=s), @c_opt ], + 'init' => [ '[DIRNAME]', sub { - "initialize storage, default: "._store_path($_[0]); - }, qw(quiet|q C=s@) ], + "initialize storage, default: ".store_path($_[0]); + }, @c_opt ], 'daemon-kill' => [ '[-SIGNAL]', 'signal the lei-daemon', # "-C DIR" conflicts with -CHLD, here, and chdir makes no sense, here opt_dash('signal|s=s', '[0-9]+|(?:[A-Z][A-Z0-9]+)') ], 'daemon-pid' => [ '', 'show the PID of the lei-daemon' ], 'help' => [ '[SUBCOMMAND]', 'show help' ], -# XXX do we need this? -# 'git' => [ '[ANYTHING...]', 'git(1) wrapper', pass_through('git') ], - -'reorder-local-store-and-break-history' => [ '[REFNAME]', - 'rewrite git history in an attempt to improve compression', - qw(gc! C=s@) ], - +# TODO +#'reorder-local-store-and-break-history' => [ '[REFNAME]', +# 'rewrite git history in an attempt to improve compression', +# qw(gc!), @c_opt ], +#'fuse-mount' => [ 'PATHNAME', 'expose lei/store as Maildir(s)', @c_opt ], +# # internal commands are prefixed with '_' '_complete' => [ '[...]', 'internal shell completion helper', pass_through('everything') ], @@ -220,32 +317,45 @@ my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ]; # we use \x{a0} (non-breaking SP) to avoid wrapping in PublicInbox::LeiHelp my %OPTDESC = ( 'help|h' => 'show this built-in help', +'c=s@' => [ 'NAME=VALUE', 'set config option' ], 'C=s@' => [ 'DIR', 'chdir to specify to directory' ], 'quiet|q' => 'be quiet', 'lock=s@' => [ 'METHOD|dotlock|fcntl|flock|none', 'mbox(5) locking method(s) to use (default: fcntl,dotlock)' ], +'incremental! import' => 'import already seen IMAP and NNTP articles', 'globoff|g' => "do not match locations using '*?' wildcards ". "and\xa0'[]'\x{a0}ranges", +'invert-match|v' => 'select non-matching lines', +'color!' => 'disable color (for --format=text)', 'verbose|v+' => 'be more verbose', 'external!' => 'do not use externals', -'solve!' => 'do not attempt to reconstruct blobs from emails', +'mail!' => 'do not look in mail storage for OID', +'cwd!' => 'do not look in git repo of current working directory', +'oid-a|A=s' => 'pre-image OID', +'path-a|a=s' => 'pre-image pathname associated with OID', +'path-b|b=s' => 'post-image pathname associated with OID', +'git-dir=s@' => 'additional git repository to scan', +'dir|d=s inspect' => + 'specify a inboxdir, extindex topdir or Xapian shard', +'proxy=s' => [ 'PROTO://HOST[:PORT]', # shared with curl(1) + "proxy for (e.g. `socks5h://0:9050')" ], 'torsocks=s' => ['VAL|auto|no|yes', 'whether or not to wrap git and curl commands with torsocks'], 'no-torsocks' => 'alias for --torsocks=no', -'save-as=s' => ['NAME', 'save a search terms by given name'], +'save!' => "do not save a search for `lei up'", 'import-remote!' => 'do not memoize remote messages into local store', 'type=s' => [ 'any|mid|git', 'disambiguate type' ], 'dedupe|d=s' => ['STRATEGY|content|oid|mid|none', 'deduplication strategy'], -'show threads|t' => 'display entire thread a message belongs to', -'q threads|t+' => +'threads|t+' => 'return all messages in the same threads as the actual match(es)', 'want|w=s@' => [ 'PREFIX|dfpost|dfn', # common ones in help... 'search prefixes to extract (default: dfpost7)' ], +'uri p2q' => [ 'URI escape output' ], 'alert=s@' => ['CMD,:WINCH,:bell,', 'run command(s) or perform ops when done writing to output ' . @@ -259,18 +369,13 @@ my %OPTDESC = ( "or\xa0`-'\x{a0}for\x{a0}stdout)" ], 'mua=s' => [ 'CMD', "MUA to run on --output Maildir or mbox (e.g.\xa0`mutt\xa0-f\xa0%f')" ], +'new-only import' => 'only import new messages from IMAP source', -'show format|f=s' => [ 'OUT|plain|raw|html|mboxrd|mboxcl2|mboxcl', - 'message/object output format' ], -'mark format|f=s' => $stdin_formats, -'forget format|f=s' => $stdin_formats, - -'add-external inbox-version=i' => [ 'NUM|1|2', +'inbox-version=i' => [ 'NUM|1|2', 'force a public-inbox version with --mirror'], -'add-external mirror=s' => [ 'URL', 'mirror a public-inbox'], +'mirror=s' => [ 'URL', 'mirror a public-inbox'], # public-inbox-index options -'add-external jobs|j=i' => 'set parallelism when indexing after --mirror', 'fsync!' => 'speed up indexing after --mirror, risk index corruption', 'compact' => 'run compact index after mirroring', 'indexlevel|L=s' => [ 'LEVEL|full|medium|basic', @@ -279,28 +384,35 @@ my %OPTDESC = ( 'do not index messages larger than SIZE (default: infinity)' ], 'batch_size|batch-size=s' => [ 'SIZE', 'flush changes to OS after given number of bytes (default: 1m)' ], -'sequential_shard|sequential-shard' => +'sequential-shard' => 'index Xapian shards sequentially for slow storage', 'skip-docdata' => 'drop compatibility w/ public-inbox <1.6 to save ~1.5% space', -'q format|f=s' => [ +'format|f=s q' => [ 'OUT|maildir|mboxrd|mboxcl2|mboxcl|mboxo|html|json|jsonl|concatjson', 'specify output format, default depends on --output'], -'q exclude=s@' => [ 'LOCATION', +'exclude=s@ q' => [ 'LOCATION', 'exclude specified external(s) from search' ], -'q include|I=s@' => [ 'LOCATION', +'include|I=s@ q' => [ 'LOCATION', 'include specified external(s) in search' ], -'q only=s@' => [ 'LOCATION', +'only|O=s@ q' => [ 'LOCATION', 'only use specified external(s) for search' ], - -'q jobs=s' => [ '[SEARCH_JOBS][,WRITER_JOBS]', +'jobs=s q' => [ '[SEARCH_JOBS][,WRITER_JOBS]', 'control number of search and writer jobs' ], +'jobs|j=i add-external' => 'set parallelism when indexing after --mirror', -'import format|f=s' => $stdin_formats, +'in-format|F=s' => $stdin_formats, +'format|f=s ls-search' => ['OUT|json|jsonl|concatjson', + 'listing output format' ], +'l ls-search' => 'long listing format', +'l ls-watch' => 'long listing format', +'l ls-mail-source' => 'long listing format', +'url ls-mail-source' => 'show full URL of newsgroup or IMAP folder', +'format|f=s ls-external' => $ls_format, -'ls-query format|f=s' => $ls_format, -'ls-external format|f=s' => $ls_format, +'prune:s forget-search' => + ['TYPE|local|remote', 'prune all, remote or local folders' ], 'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ], 'offset=i' => ['OFF', 'search result offset (default: 0)'], @@ -316,6 +428,12 @@ my %OPTDESC = ( 'remote' => 'limit operations to those requiring network access', 'remote!' => 'prevent operations requiring network access', +# up, refresh-mail-sync, export-kw +'all:s' => ['TYPE|local|remote', 'all remote or local folders' ], + +'remote-fudge-time=s' => [ 'INTERVAL', + 'look for mail INTERVAL older than the last successful query' ], + 'mid=s' => 'specify the Message-ID of a message', 'oid=s' => 'specify the git object ID of a message', @@ -328,7 +446,7 @@ my %OPTDESC = ( 'by-mid|mid:s' => [ 'MID', 'match only by Message-ID, ignoring contents' ], -'kw|keywords|flags!' => 'disable/enable importing flags', +'kw!' => 'disable/enable importing keywords (aka "flags")', # xargs, env, use "-0", git(1) uses "-z". We support z|0 everywhere 'z|0' => 'use NUL \\0 instead of newline (CR) to delimit lines', @@ -340,32 +458,29 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m imp mrr cnv p2q); # internal workers +my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne v2w); # internal workers + +sub _drop_wq { + my ($self) = @_; + for my $wq (grep(defined, delete(@$self{@WQ_KEYS}))) { + $wq->wq_kill('-TERM'); + $wq->DESTROY; + } +} # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE sub x_it ($$) { my ($self, $code) = @_; + local $current_lei = $self; # make sure client sees stdout before exit $self->{1}->autoflush(1) if $self->{1}; - dump_and_clear_log(); - if (my $s = $self->{pkt_op_p} // $self->{sock}) { - send($s, "x_it $code", MSG_EOR); - } elsif ($self->{oneshot}) { - # don't want to end up using $? from child processes - for my $f (@WQ_KEYS) { - my $wq = delete $self->{$f} or next; - $wq->DESTROY; - } - # cleanup anything that has tempfiles or open file handles - %PATH2CFG = (); - delete @$self{qw(ovv dedupe sto cfg)}; - if (my $signum = ($code & 127)) { # usually SIGPIPE (13) - $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work - kill $signum, $$; - sleep(1) while 1; # wait for signal - } else { - $quit->($code >> 8); - } + stop_pager($self); + if ($self->{pkt_op_p}) { # worker => lei-daemon + $self->{pkt_op_p}->pkt_do('x_it', $code); + } elsif ($self->{sock}) { # lei->daemon => lei(1) client + send($self->{sock}, "x_it $code", MSG_EOR); + } elsif ($quit == \&CORE::exit) { # an admin (one-shot) command + exit($code >> 8); } # else ignore if client disconnected } @@ -382,13 +497,17 @@ sub err ($;@) { sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) } +sub qfin { # show message on finalization (LeiFinmsg) + my ($lei, $msg) = @_; + return if $lei->{opt}->{quiet}; + $lei->{fmsg} ? push(@{$lei->{fmsg}}, "$msg\n") : qerr($lei, $msg); +} + sub fail_handler ($;$$) { my ($lei, $code, $io) = @_; - for my $f (@WQ_KEYS) { - my $wq = delete $lei->{$f} or next; - $wq->wq_wait_old(undef, $lei) if $wq->wq_kill_old; # lei-daemon - } + local $current_lei = $lei; close($io) if $io; # needed to avoid warnings on SIGPIPE + _drop_wq($lei); x_it($lei, $code // (1 << 8)); } @@ -396,38 +515,16 @@ sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers fail_handler($_[0], 13, delete $_[0]->{1}); } -# PublicInbox::OnDestroy callback for SIGINT to take out the entire pgid -sub sigint_reap { - my ($pgid) = @_; - dwaitpid($pgid) if kill('-INT', $pgid); -} - sub fail ($$;$) { - my ($self, $buf, $exit_code) = @_; - err($self, $buf) if defined $buf; - # calls fail_handler: - send($self->{pkt_op_p}, '!', MSG_EOR) if $self->{pkt_op_p}; + my ($self, $msg, $exit_code) = @_; + local $current_lei = $self; + $self->{failed}++; + warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg; + $self->{pkt_op_p}->pkt_do('fail_handler') if $self->{pkt_op_p}; x_it($self, ($exit_code // 1) << 8); undef; } -sub check_input_format ($;$) { - my ($self, $files) = @_; - my $opt_key = 'in-format'; - my $fmt = $self->{opt}->{$opt_key}; - if (!$fmt) { - my $err = $files ? "regular file(s):\n@$files" : '--stdin'; - return fail($self, "--$opt_key unset for $err"); - } - require PublicInbox::MboxLock if $files; - return 1 if $fmt eq 'eml'; - # XXX: should this handle {gz,bz2,xz}? that's currently in LeiToMail - require PublicInbox::MboxReader; - PublicInbox::MboxReader->can($fmt) or - return fail($self, "--$opt_key=$fmt unrecognized"); - 1; -} - sub out ($;@) { my $self = shift; return if print { $self->{1} // return } @_; # likely @@ -441,37 +538,42 @@ sub puts ($;@) { out(shift, map { "$_\n" } @_) } sub child_error { # passes non-fatal curl exit codes to user my ($self, $child_error, $msg) = @_; # child_error is $? - $self->err($msg) if $msg; - if (my $s = $self->{pkt_op_p} // $self->{sock}) { - # send to the parent lei-daemon or to lei(1) client - send($s, "child_error $child_error", MSG_EOR); - } elsif (!$PublicInbox::DS::in_loop) { - $self->{child_error} = $child_error; + local $current_lei = $self; + $child_error ||= 1 << 8; + warn(substr($msg, -1, 1) eq "\n" ? $msg : "$msg\n") if defined $msg; + if ($self->{pkt_op_p}) { # to top lei-daemon + $self->{pkt_op_p}->pkt_do('child_error', $child_error); + } elsif ($self->{sock}) { # to lei(1) client + send($self->{sock}, "child_error $child_error", MSG_EOR); + } else { # non-lei admin command + $self->{child_error} ||= $child_error; } # else noop if client disconnected } sub note_sigpipe { # triggers sigpipe_handler my ($self, $fd) = @_; close(delete($self->{$fd})); # explicit close silences Perl warning - send($self->{pkt_op_p}, '|', MSG_EOR) if $self->{pkt_op_p}; + $self->{pkt_op_p}->pkt_do('sigpipe_handler') if $self->{pkt_op_p}; x_it($self, 13); } -sub lei_atfork_child { +sub _lei_atfork_child { my ($self, $persist) = @_; # we need to explicitly close things which are on stack if ($persist) { - my @io = delete @$self{qw(0 1 2 sock)}; - unless ($self->{oneshot}) { - close($_) for @io; + open $self->{3}, '<', '/' or die "open(/) $!"; + fchdir($self); + close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)})); + if (my $cfg = $self->{cfg}) { + delete @$cfg{qw(-lei_store -watches -lei_note_event)}; } - } else { - delete $self->{0}; - } - delete @$self{qw(cnv)}; - for (delete @$self{qw(3 old_1 au_done)}) { - close($_) if defined($_); + } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly + open STDERR, '+>&='.fileno($self->{2}) or warn "open $!"; + STDERR->autoflush(1); + POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!"; } + close($_) for (grep(defined, delete @$self{qw(old_1 au_done)})); + delete $self->{-socks}; if (my $op_c = delete $self->{pkt_op_c}) { close(delete $op_c->{sock}); } @@ -480,29 +582,92 @@ sub lei_atfork_child { } close $listener if $listener; undef $listener; + $dir_idle->force_close if $dir_idle; + undef $dir_idle; %PATH2CFG = (); + $MDIR2CFGPATH = {}; + eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush'; undef $errors_log; $quit = \&CORE::exit; + if (!$self->{-eml_noisy}) { # only "lei import" sets this atm + my $cb = $SIG{__WARN__} // \&CORE::warn; + $SIG{__WARN__} = sub { + $cb->(@_) unless PublicInbox::Eml::warn_ignore(@_) + }; + } + $SIG{TERM} = sub { exit(128 + 15) }; $current_lei = $persist ? undef : $self; # for SIG{__WARN__} } -sub workers_start { - my ($lei, $wq, $ident, $jobs, $ops) = @_; - $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - '|' => [ $lei->can('sigpipe_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - %$ops - }; +sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die + my ($self) = @_; + if (my $op = delete $self->{pkt_op_c}) { # in case of die + $op->close; # PublicInbox::PktOp::close + } + my $pkt_op_p = delete($self->{pkt_op_p}) or return; + close $pkt_op_p->{op_p}; +} + +sub pkt_op_pair { + my ($self) = @_; + require PublicInbox::OnDestroy; require PublicInbox::PktOp; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); + my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self); + @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair; + $end; +} + +sub incr { + my ($self, $field, $nr) = @_; + $self->{counters}->{$field} += $nr; +} + +sub pkt_ops { + my ($lei, $ops) = @_; + $ops->{fail_handler} = [ $lei ]; + $ops->{sigpipe_handler} = [ $lei ]; + $ops->{x_it} = [ $lei ]; + $ops->{child_error} = [ $lei ]; + $ops->{incr} = [ $lei ]; + $ops; +} + +sub workers_start { + my ($lei, $wq, $jobs, $ops, $flds) = @_; + $ops //= {}; + ($wq->can('net_merge_all_done') && $lei->{auth}) and + $lei->{auth}->op_merge($ops, $wq, $lei); + pkt_ops($lei, $ops); + $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ]; + my $end = $lei->pkt_op_pair; + my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker"; + $flds->{lei} = $lei; + $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds); delete $lei->{pkt_op_p}; - my $op = delete $lei->{pkt_op_c}; + my $op_c = delete $lei->{pkt_op_c}; + @$end = (); $lei->event_step_init; - # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op - $lei->{oneshot} ? $op : undef; + $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei); + ($op_c, $ops); +} + +# call this when we're ready to wait on events and yield to other clients +sub wait_wq_events { + my ($lei, $op_c, $ops) = @_; + my $wq1 = $lei->{wq1}; + ($wq1 && $wq1->can('net_merge_all_done') && !$lei->{auth}) and + $wq1->net_merge_all_done; + for my $wq (grep(defined, @$lei{qw(ikw pmd)})) { # auxiliary WQs + $wq->wq_close; + } + $op_c->{ops} = $ops; +} + +sub wq1_start { + my ($lei, $wq, $jobs) = @_; + my ($op_c, $ops) = workers_start($lei, $wq, $jobs // 1); + $lei->{wq1} = $wq; + wait_wq_events($lei, $op_c, $ops); # net_merge_all_done if !{auth} } sub _help { @@ -557,7 +722,15 @@ sub optparse ($$$) { my $ok; for my $o (@or) { if ($o =~ /\A--([a-z0-9\-]+)/) { - $ok = defined($OPT->{$1}); + my $sw = $1; + # assume pipe/regular file on stdin + # w/o args means stdin + if ($sw eq 'stdin' && !@$argv && + (-p $self->{0} || + -f _) && -r _) { + $OPT->{stdin} //= 1; + } + $ok = defined($OPT->{$sw}); last if $ok; } elsif (defined($argv->[$i])) { $ok = 1; @@ -579,25 +752,74 @@ sub optparse ($$$) { $err ? fail($self, "usage: lei $cmd $proto\nE: $err") : 1; } +sub _tmp_cfg { # for lei -c = ... + my ($self) = @_; + my $cfg = _lei_cfg($self, 1); + require File::Temp; + my $ft = File::Temp->new(TEMPLATE => 'lei_cfg-XXXX', TMPDIR => 1); + my $tmp = { '-f' => $ft->filename, -tmp => $ft }; + $ft->autoflush(1); + print $ft <{-f}: $!"); +[include] + path = $cfg->{-f} +EOM + $tmp = $self->{cfg} = bless { %$cfg, %$tmp }, ref($cfg); + for (@{$self->{opt}->{c}}) { + /\A([^=\.]+\.[^=]+)(?:=(.*))?\z/ or return fail($self, <=' +EOM + my $name = $1; + my $value = $2 // 1; + _config($self, '--add', $name, $value); + if (defined(my $v = $tmp->{$name})) { + if (ref($v) eq 'ARRAY') { + push @$v, $value; + } else { + $tmp->{$name} = [ $v, $value ]; + } + } else { + $tmp->{$name} = $value; + } + } +} + +sub lazy_cb ($$$) { + my ($self, $cmd, $pfx) = @_; + my $ucmd = $cmd; + $ucmd =~ tr/-/_/; + my $cb; + $cb = $self->can($pfx.$ucmd) and return $cb; + my $base = $ucmd; + $base =~ s/_([a-z])/\u$1/g; + my $pkg = "PublicInbox::Lei\u$base"; + ($INC{"PublicInbox/Lei\u$base.pm"} // eval("require $pkg")) ? + $pkg->can($pfx.$ucmd) : undef; +} + sub dispatch { my ($self, $cmd, @argv) = @_; + fchdir($self); + local %ENV = %{$self->{env}}; local $current_lei = $self; # for __WARN__ - dump_and_clear_log("from previous run\n"); + $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY return _help($self, 'no command given') unless defined($cmd); - while ($cmd eq '-C') { # do not support Getopt bundling for this - my $d = shift(@argv) // return fail($self, '-C DIRECTORY'); - push @{$self->{opt}->{C}}, $d; + # do not support Getopt bundling for this + while ($cmd eq '-C' || $cmd eq '-c') { + my $v = shift(@argv) // return fail($self, $cmd eq '-C' ? + '-C DIRECTORY' : '-c ='); + push @{$self->{opt}->{substr($cmd, 1, 1)}}, $v; $cmd = shift(@argv) // return _help($self, 'no command given'); } - my $func = "lei_$cmd"; - $func =~ tr/-/_/; - if (my $cb = __PACKAGE__->can($func)) { + if (my $cb = lazy_cb(__PACKAGE__, $cmd, 'lei_')) { optparse($self, $cmd, \@argv) or return; + $self->{opt}->{c} and (_tmp_cfg($self) // return); if (my $chdir = $self->{opt}->{C}) { for my $d (@$chdir) { next if $d eq ''; # same as git(1) chdir $d or return fail($self, "cd $d: $!"); } + open $self->{3}, '<', '.' or + return fail($self, "open . $!"); } $cb->($self, @argv); } elsif (grep(/\A-/, $cmd, @argv)) { # --help or -h only @@ -611,50 +833,59 @@ sub dispatch { sub _lei_cfg ($;$) { my ($self, $creat) = @_; + return $self->{cfg} if $self->{cfg}; my $f = _config_path($self); my @st = stat($f); my $cur_st = @st ? pack('dd', $st[10], $st[7]) : ''; # 10:ctime, 7:size + my ($sto, $sto_dir, $watches, $lne); if (my $cfg = $PATH2CFG{$f}) { # reuse existing object in common case return ($self->{cfg} = $cfg) if $cur_st eq $cfg->{-st}; + ($sto, $sto_dir, $watches, $lne) = + @$cfg{qw(-lei_store leistore.dir -watches + -lei_note_event)}; } if (!@st) { unless ($creat) { delete $self->{cfg}; return bless {}, 'PublicInbox::Config'; } - my (undef, $cfg_dir, undef) = File::Spec->splitpath($f); + my ($cfg_dir) = ($f =~ m!(.*?/)[^/]+\z!); -d $cfg_dir or mkpath($cfg_dir) or die "mkpath($cfg_dir): $!\n"; open my $fh, '>>', $f or die "open($f): $!\n"; @st = stat($fh) or die "fstat($f): $!\n"; $cur_st = pack('dd', $st[10], $st[7]); qerr($self, "# $f created") if $self->{cmd} ne 'config'; } - my $cfg = PublicInbox::Config::git_config_dump($f); - bless $cfg, 'PublicInbox::Config'; + my $cfg = PublicInbox::Config->git_config_dump($f, $self->{2}); $cfg->{-st} = $cur_st; $cfg->{'-f'} = $f; + if ($sto && canonpath_harder($sto_dir // store_path($self)) + eq canonpath_harder($cfg->{'leistore.dir'} // + store_path($self))) { + $cfg->{-lei_store} = $sto; + $cfg->{-lei_note_event} = $lne; + $cfg->{-watches} = $watches if $watches; + } + if (scalar(keys %PATH2CFG) > 5) { + # FIXME: use inotify/EVFILT_VNODE to detect unlinked configs + delete(@PATH2CFG{grep(!-f, keys %PATH2CFG)}); + } $self->{cfg} = $PATH2CFG{$f} = $cfg; + refresh_watches($self); + $cfg; } sub _lei_store ($;$) { my ($self, $creat) = @_; - my $cfg = _lei_cfg($self, $creat); + my $cfg = _lei_cfg($self, $creat) // return; $cfg->{-lei_store} //= do { require PublicInbox::LeiStore; - my $dir = $cfg->{'leistore.dir'}; - $dir //= $creat ? _store_path($self) : return; + my $dir = $cfg->{'leistore.dir'} // store_path($self); + return unless $creat || -d $dir; PublicInbox::LeiStore->new($dir, { creat => $creat }); }; } -sub lei_show { - my ($self, @argv) = @_; -} - -sub lei_mark { - my ($self, @argv) = @_; -} - sub _config { my ($self, @argv) = @_; my %env = (%{$self->{env}}, GIT_CONFIG => undef); @@ -664,61 +895,6 @@ sub _config { waitpid(spawn($cmd, \%env, \%rdr), 0); } -sub lei_config { - my ($self, @argv) = @_; - $self->{opt}->{'config-file'} and return fail $self, - "config file switches not supported by `lei config'"; - _config(@_); - x_it($self, $?) if $?; -} - -sub lei_import { - require PublicInbox::LeiImport; - PublicInbox::LeiImport->call(@_); -} - -sub lei_convert { - require PublicInbox::LeiConvert; - PublicInbox::LeiConvert->call(@_); -} - -sub lei_p2q { - require PublicInbox::LeiP2q; - PublicInbox::LeiP2q->call(@_); -} - -sub lei_init { - my ($self, $dir) = @_; - my $cfg = _lei_cfg($self, 1); - my $cur = $cfg->{'leistore.dir'}; - $dir //= _store_path($self); - $dir = rel2abs($self, $dir); - my @cur = stat($cur) if defined($cur); - $cur = File::Spec->canonpath($cur // $dir); - my @dir = stat($dir); - my $exists = "# leistore.dir=$cur already initialized" if @dir; - if (@cur) { - if ($cur eq $dir) { - _lei_store($self, 1)->done; - return qerr($self, $exists); - } - - # some folks like symlinks and bind mounts :P - if (@dir && "@cur[1,0]" eq "@dir[1,0]") { - lei_config($self, 'leistore.dir', $dir); - _lei_store($self, 1)->done; - return qerr($self, "$exists (as $cur)"); - } - return fail($self, <<""); -E: leistore.dir=$cur already initialized and it is not $dir - - } - lei_config($self, 'leistore.dir', $dir); - _lei_store($self, 1)->done; - $exists //= "# leistore.dir=$dir newly initialized"; - return qerr($self, $exists); -} - sub lei_daemon_pid { puts shift, $$ } sub lei_daemon_kill { @@ -727,8 +903,6 @@ sub lei_daemon_kill { kill($sig, $$) or fail($self, "kill($sig, $$): $!"); } -sub lei_help { _help($_[0]) } - # Shell completion helper. Used by lei-completion.bash and hopefully # other shells. Try to do as much here as possible to avoid redundancy # and improve maintainability. @@ -744,7 +918,7 @@ sub lei__complete { my ($proto, undef, @spec) = @$info; my $cur = pop @argv; my $re = defined($cur) ? qr/\A\Q$cur\E/ : qr/./; - if (substr($cur // '-', 0, 1) eq '-') { # --switches + if (substr(my $_cur = $cur // '-', 0, 1) eq '-') { # --switches # gross special case since the only git-config options # Consider moving to a table if we need more special cases # we use Getopt::Long for are the ones we reject, so these @@ -763,14 +937,14 @@ sub lei__complete { if (s/[:=].+\z//) { # req/optional args, e.g output|o=i } elsif (s/\+\z//) { # verbose|v+ } elsif (s/!\z//) { - # negation: solve! => no-solve|solve + # negation: mail! => no-mail|mail s/([\w\-]+)/$1|no-$1/g } map { my $x = length > 1 ? "--$_" : "-$_"; - $x eq $cur ? () : $x; + $x eq $_cur ? () : $x; } grep(!/_/, split(/\|/, $_, -1)) # help|h - } grep { $OPTDESC{"$cmd\t$_"} || $OPTDESC{$_} } @spec); + } grep { $OPTDESC{"$_\t$cmd"} || $OPTDESC{$_} } @spec); } elsif ($cmd eq 'config' && !@argv && !$CONFIG_KEYS{$cur}) { puts $self, grep(/$re/, keys %CONFIG_KEYS); } @@ -783,13 +957,12 @@ sub lei__complete { my @v = ref($v) ? split(/\|/, $v->[0]) : (); # get rid of ALL CAPS placeholder (e.g "OUT") # (TODO: completion for external paths) - shift(@v) if uc($v[0]) eq $v[0]; + shift(@v) if scalar(@v) && uc($v[0]) eq $v[0]; @v; - } grep(/\A(?:$cmd\t|)(?:[\w-]+\|)*$opt\b/, keys %OPTDESC); + } grep(/\A(?:[\w-]+\|)*$opt\b.*?(?:\t$cmd)?\z/, keys %OPTDESC); } - $cmd =~ tr/-/_/; - if (my $sub = $self->can("_complete_$cmd")) { - puts $self, $sub->($self, @argv, $cur); + if (my $cb = lazy_cb($self, $cmd, '_complete_')) { + puts $self, $cb->($self, @argv, $cur ? ($cur) : ()); } # TODO: URLs, pathnames, OIDs, MIDs, etc... See optparse() for # proto parsing. @@ -805,6 +978,9 @@ sub exec_buf ($$) { sub start_mua { my ($self) = @_; + if ($self->{ovv}->{fmt} =~ /\A(?:maildir)\z/) { # TODO: IMAP + refresh_watches($self); + } my $mua = $self->{opt}->{mua} // return; my $mfolder = $self->{ovv}->{dst}; my (@cmd, $replaced); @@ -818,14 +994,16 @@ sub start_mua { @cmd = map { $_ eq '%f' ? ($replaced = $mfolder) : $_ } @cmd; } push @cmd, $mfolder unless defined($replaced); - if (my $sock = $self->{sock}) { # lei(1) client process runs it - send($sock, exec_buf(\@cmd, {}), MSG_EOR); - } elsif ($self->{oneshot}) { - $self->{"pid.$self.$$"}->{spawn(\@cmd)} = \@cmd; + if ($self->{sock}) { # lei(1) client process runs it + # restore terminal: echo $query | lei q --stdin --mua=... + my $io = []; + $io->[0] = $self->{1} if $self->{opt}->{stdin} && -t $self->{1}; + send_exec_cmd($self, $io, \@cmd, {}); } if ($self->{lxs} && $self->{au_done}) { # kick wait_startq syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0)); } + return unless -t $self->{2}; # XXX how to determine non-TUI MUAs? $self->{opt}->{quiet} = 1; delete $self->{-progress}; delete $self->{opt}->{verbose}; @@ -841,14 +1019,11 @@ sub send_exec_cmd { # tell script/lei to execute a command sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail my ($self) = @_; my $alerts = $self->{opt}->{alert} // return; + my $sock = $self->{sock}; while (my $op = shift(@$alerts)) { if ($op eq ':WINCH') { # hit the process group that started the MUA - if ($self->{sock}) { - send($self->{sock}, '-WINCH', MSG_EOR); - } elsif ($self->{oneshot}) { - kill('-WINCH', $$); - } + send($sock, '-WINCH', MSG_EOR) if $sock; } elsif ($op eq ':bell') { out($self, "\a"); } elsif ($op =~ /(?{sock}) { - send($s, exec_buf($cmd, {}), MSG_EOR); - } elsif ($self->{oneshot}) { - $self->{"pid.$self.$$"}->{spawn($cmd)} = $cmd; - } + send($sock, exec_buf($cmd, {}), MSG_EOR) if $sock; } else { - err($self, "W: unsupported --alert=$op"); # non-fatal + warn("W: unsupported --alert=$op\n"); # non-fatal } } } my %path_to_fd = ('/dev/stdin' => 0, '/dev/stdout' => 1, '/dev/stderr' => 2); -$path_to_fd{"/dev/fd/$_"} = $path_to_fd{"/proc/self/fd/$_"} for (0..2); -sub fopen { - my ($self, $mode, $path) = @_; - rel2abs($self, $path); +$path_to_fd{"/dev/fd/$_"} = $_ for (0..2); + +# this also normalizes the path +sub path_to_fd { + my ($self, $path) = @_; + $path = rel2abs($self, $path); $path =~ tr!/!/!s; - if (defined(my $fd = $path_to_fd{$path})) { - return $self->{$fd}; - } - if ($path =~ m!\A/(?:dev|proc/self)/fd/[0-9]+\z!) { - return fail($self, "cannot open $path from daemon"); - } - open my $fh, $mode, $path or return; - $fh; + $path_to_fd{$path} // ( + ($path =~ m!\A/(?:dev|proc/self)/fd/[0-9]+\z!) ? + fail($self, "cannot open $path from daemon") : -1 + ); } # caller needs to "-t $self->{1}" to check if tty sub start_pager { - my ($self) = @_; + my ($self, $new_env) = @_; my $fh = popen_rd([qw(git var GIT_PAGER)]); chomp(my $pager = <$fh> // ''); close($fh) or warn "`git var PAGER' error: \$?=$?"; return if $pager eq 'cat' || $pager eq ''; - my $new_env = { LESS => 'FRX', LV => '-c' }; - $new_env->{MORE} = 'FRX' if $^O eq 'freebsd'; + $new_env //= {}; + $new_env->{LESS} //= 'FRX'; + $new_env->{LV} //= '-c'; + $new_env->{MORE} = $new_env->{LESS} if $^O eq 'freebsd'; pipe(my ($r, $wpager)) or return warn "pipe: $!"; my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; my $pgr = [ undef, @$rdr{1, 2} ]; @@ -900,9 +1071,6 @@ sub start_pager { if ($self->{sock}) { # lei(1) process runs it delete @$new_env{keys %$env}; # only set iff unset send_exec_cmd($self, [ @$rdr{0..2} ], [$pager], $new_env); - } elsif ($self->{oneshot}) { - my $cmd = [$pager]; - $self->{"pid.$self.$$"}->{spawn($cmd, $new_env, $rdr)} = $cmd; } else { die 'BUG: start_pager w/o socket'; } @@ -912,12 +1080,23 @@ sub start_pager { $self->{pgr} = $pgr; } +# display a message for user before spawning full-screen $VISUAL +sub pgr_err { + my ($self, @msg) = @_; + return warn(@msg) unless $self->{sock} && -t $self->{2}; + start_pager($self, { LESS => 'RX' }); # no 'F' so we prompt + print { $self->{2} } @msg; + $self->{2}->autoflush(1); + stop_pager($self); + send($self->{sock}, 'wait', MSG_EOR); # wait for user to quit pager +} + sub stop_pager { my ($self) = @_; my $pgr = delete($self->{pgr}) or return; $self->{2} = $pgr->[2]; - # do not restore original stdout, just close it so we error out close(delete($self->{1})) if $self->{1}; + $self->{1} = $pgr->[1]; } sub accept_dispatch { # Listener {post_accept} callback @@ -929,19 +1108,17 @@ sub accept_dispatch { # Listener {post_accept} callback return send($sock, 'timed out waiting to recv FDs', MSG_EOR); # (4096 * 33) >MAX_ARG_STRLEN my @fds = $recv_cmd->($sock, my $buf, 4096 * 33) or return; # EOF - if (scalar(@fds) == 4) { - for my $i (0..3) { - my $fd = shift(@fds); - open($self->{$i}, '+<&=', $fd) and next; - send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR); - } - } elsif (!defined($fds[0])) { + if (!defined($fds[0])) { warn(my $msg = "recv_cmd failed: $!"); return send($sock, $msg, MSG_EOR); } else { - return; + my $i = 0; + for my $fd (@fds) { + open($self->{$i++}, '+<&=', $fd) and next; + send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR); + } + $i == 4 or return send($sock, 'not enough FDs='.($i-1), MSG_EOR) } - $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV); # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0"; substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z// @@ -949,31 +1126,16 @@ sub accept_dispatch { # Listener {post_accept} callback my ($argc, @argv) = split(/\0/, $buf, -1); undef $buf; my %env = map { split(/=/, $_, 2) } splice(@argv, $argc); - if (chdir($self->{3})) { - local %ENV = %env; - $self->{env} = \%env; - eval { dispatch($self, @argv) }; - send($sock, $@, MSG_EOR) if $@; - } else { - send($sock, "fchdir: $!", MSG_EOR); # implicit close - } + $self->{env} = \%env; + eval { dispatch($self, @argv) }; + $self->fail($@) if $@; } sub dclose { my ($self) = @_; + local $current_lei = $self; delete $self->{-progress}; - for my $f (@WQ_KEYS) { - my $wq = delete $self->{$f} or next; - if ($wq->wq_kill) { - $wq->wq_close(0, undef, $self); - } elsif ($wq->wq_kill_old) { - $wq->wq_wait_old(undef, $self); - } - } - close(delete $self->{1}) if $self->{1}; # may reap_compress - if (my $sto = delete $self->{sto}) { - $sto->ipc_do('done'); - } + _drop_wq($self) if $self->{failed}; $self->close if $self->{-event_init_done}; # PublicInbox::DS::close } @@ -981,22 +1143,30 @@ sub dclose { sub event_step { my ($self) = @_; local %ENV = %{$self->{env}}; - my $sock = $self->{sock}; local $current_lei = $self; eval { - while (my @fds = $recv_cmd->($sock, my $buf, 4096)) { - if (scalar(@fds) == 1 && !defined($fds[0])) { - return if $! == EAGAIN; - next if $! == EINTR; - last if $! == ECONNRESET; - die "recvmsg: $!"; - } - for my $fd (@fds) { - open my $rfh, '+<&=', $fd; + my @fds = $recv_cmd->($self->{sock} // return, my $buf, 4096); + if (scalar(@fds) == 1 && !defined($fds[0])) { + return if $! == EAGAIN; + die "recvmsg: $!" if $! != ECONNRESET; + @fds = (); # for open loop below: + } + for (@fds) { open my $rfh, '+<&=', $_ } + if ($buf eq '') { + _drop_wq($self); # EOF, client disconnected + dclose($self); + $buf = 'TERM'; + } + if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) { + my $sig = "-$buf"; + for my $wq (grep(defined, @$self{@WQ_KEYS})) { + $wq->wq_kill($sig); } + } else { die "unrecognized client signal: $buf"; } - dclose($self); + my $s = $self->{-socks} // []; # lei up --all + @$s = grep { send($_, $buf, MSG_EOR) } @$s; }; if (my $err = $@) { eval { $self->fail($err) }; @@ -1006,10 +1176,12 @@ sub event_step { sub event_step_init { my ($self) = @_; - return if $self->{-event_init_done}++; - if (my $sock = $self->{sock}) { # using DS->EventLoop - $self->SUPER::new($sock, EPOLLIN|EPOLLET); - } + my $sock = $self->{sock} or return; + $self->{-event_init_done} // do { # persist til $ops done + $sock->blocking(0); + $self->SUPER::new($sock, EPOLLIN); + $self->{-event_init_done} = $sock; + }; } sub noop {} @@ -1018,10 +1190,57 @@ sub oldset { $oldset } sub dump_and_clear_log { if (defined($errors_log) && -s STDIN && seek(STDIN, 0, SEEK_SET)) { - my @pfx = @_; - unshift(@pfx, "$errors_log ") if @pfx; - warn @pfx, do { local $/; }; - truncate(STDIN, 0) or warn "ftruncate ($errors_log): $!"; + openlog('lei-daemon', 'pid,nowait,nofatal,ndelay', 'user'); + chomp(my @lines = ); + truncate(STDIN, 0) or + syslog('warning', "ftruncate (%s): %m", $errors_log); + for my $l (@lines) { syslog('warning', '%s', $l) } + closelog(); # don't share across fork + } +} + +sub cfg2lei ($) { + my ($cfg) = @_; + my $lei = bless { env => { %{$cfg->{-env}} } }, __PACKAGE__; + open($lei->{0}, '<&', \*STDIN) or die "dup 0: $!"; + open($lei->{1}, '>>&', \*STDOUT) or die "dup 1: $!"; + open($lei->{2}, '>>&', \*STDERR) or die "dup 2: $!"; + open($lei->{3}, '<', '/') or die "open /: $!"; + my ($x, $y); + socketpair($x, $y, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; + $lei->{sock} = $x; + require PublicInbox::LeiSelfSocket; + PublicInbox::LeiSelfSocket->new($y); # adds to event loop + $lei; +} + +sub dir_idle_handler ($) { # PublicInbox::DirIdle callback + my ($ev) = @_; # Linux::Inotify2::Event or duck type + my $fn = $ev->fullname; + if ($fn =~ m!\A(.+)/(new|cur)/([^/]+)\z!) { # Maildir file + my ($mdir, $nc, $bn) = ($1, $2, $3); + $nc = '' if $ev->IN_DELETE || $ev->IN_MOVED_FROM; + for my $f (keys %{$MDIR2CFGPATH->{$mdir} // {}}) { + my $cfg = $PATH2CFG{$f} // next; + eval { + my $lei = cfg2lei($cfg); + $lei->dispatch('note-event', + "maildir:$mdir", $nc, $bn, $fn); + }; + warn "E: note-event $f: $@\n" if $@; + } + } + if ($ev->can('cancel') && ($ev->IN_IGNORE || $ev->IN_UNMOUNT)) { + $ev->cancel; + } + if ($fn =~ m!\A(.+)/(?:new|cur)\z! && !-e $fn) { + delete $MDIR2CFGPATH->{$1}; + } + if (!-e $fn) { # config file or Maildir gone + for my $cfgpaths (values %$MDIR2CFGPATH) { + delete $cfgpaths->{$fn}; + } + delete $PATH2CFG{$fn}; } } @@ -1029,10 +1248,11 @@ sub dump_and_clear_log { sub lazy_start { my ($path, $errno, $narg) = @_; local ($errors_log, $listener); - ($errors_log) = ($path =~ m!\A(.+?/)[^/]+\z!); - $errors_log .= 'errors.log'; + my ($sock_dir) = ($path =~ m!\A(.+?)/[^/]+\z!); + $errors_log = "$sock_dir/errors.log"; my $addr = pack_sockaddr_un($path); my $lk = bless { lock_path => $errors_log }, 'PublicInbox::Lock'; + umask(077) // die("umask(077): $!"); $lk->lock_acquire; socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!"; if ($errno == ECONNREFUSED || $errno == ENOENT) { @@ -1044,9 +1264,7 @@ sub lazy_start { $! = $errno; # allow interpolation to stringify in die die "connect($path): $!"; } - umask(077) // die("umask(077): $!"); bind($listener, $addr) or die "bind($path): $!"; - listen($listener, 1024) or die "listen: $!"; $lk->lock_release; undef $lk; my @st = stat($path) or die "stat($path): $!"; @@ -1064,27 +1282,28 @@ sub lazy_start { (Socket::MsgHdr || Inline::C) missing/unconfigured (narg=$narg); require PublicInbox::Listener; - require PublicInbox::EOFpipe; + require PublicInbox::PktOp; (-p STDOUT) or die "E: stdout must be a pipe\n"; open(STDIN, '+>>', $errors_log) or die "open($errors_log): $!"; STDIN->autoflush(1); - dump_and_clear_log("from previous daemon process:\n"); + dump_and_clear_log(); POSIX::setsid() > 0 or die "setsid: $!"; my $pid = fork // die "fork: $!"; return if $pid; $0 = "lei-daemon $path"; local %PATH2CFG; + local $MDIR2CFGPATH; $listener->blocking(0); my $exit_code; my $pil = PublicInbox::Listener->new($listener, \&accept_dispatch); local $quit = do { - pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; - PublicInbox::EOFpipe->new($eof_r, \&noop, undef); + my (undef, $eof_p) = PublicInbox::PktOp->pair; sub { $exit_code //= shift; + eval 'PublicInbox::LeiNoteEvent::flush_task()'; my $lis = $pil or exit($exit_code); - # closing eof_w triggers \&noop wakeup - $listener = $eof_w = $pil = $path = undef; + # closing eof_p triggers \&noop wakeup + $listener = $eof_p = $pil = $path = undef; $lis->close; # DS::close PublicInbox::DS->SetLoopTimeout(1000); }; @@ -1098,19 +1317,12 @@ sub lazy_start { USR1 => \&noop, USR2 => \&noop, }; - my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK); - local @SIG{keys %$sig} = values(%$sig) unless $sigfd; - undef $sig; - local $SIG{PIPE} = 'IGNORE'; - if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets - undef $sigfd; - PublicInbox::DS->SetLoopTimeout(5000); - } else { - # wake up every second to accept signals if we don't - # have signalfd or IO::KQueue: - PublicInbox::DS::sig_setmask($oldset); - PublicInbox::DS->SetLoopTimeout(1000); - } + require PublicInbox::DirIdle; + local $dir_idle = PublicInbox::DirIdle->new(sub { + # just rely on wakeup to hit PostLoopCallback set below + dir_idle_handler($_[0]) if $_[0]->fullname ne $path; + }); + $dir_idle->add_watches([$sock_dir]); PublicInbox::DS->SetPostLoopCallback(sub { my ($dmap, undef) = @_; if (@st = defined($path) ? stat($path) : ()) { @@ -1124,11 +1336,10 @@ sub lazy_start { $quit->(); } return 1 if defined($path); - my $now = now(); my $n = 0; for my $s (values %$dmap) { $s->can('busy') or next; - if ($s->busy($now)) { + if ($s->busy) { ++$n; } else { $s->close; @@ -1147,40 +1358,187 @@ sub lazy_start { open STDERR, '>&STDIN' or die "redirect stderr failed: $!"; open STDOUT, '>&STDIN' or die "redirect stdout failed: $!"; # $daemon pipe to `lei' closed, main loop begins: - PublicInbox::DS->EventLoop; + eval { PublicInbox::DS::event_loop($sig, $oldset) }; + warn "event loop error: $@\n" if $@; + # exit() may trigger waitpid via various DESTROY, ensure interruptible + PublicInbox::DS::sig_setmask($oldset); + dump_and_clear_log(); exit($exit_code // 0); } sub busy { 1 } # prevent daemon-shutdown if client is connected -# for users w/o Socket::Msghdr installed or Inline::C enabled -sub oneshot { - my ($main_pkg) = @_; - my $exit = $main_pkg->can('exit'); # caller may override exit() - local $quit = $exit if $exit; - local %PATH2CFG; - umask(077) // die("umask(077): $!"); - my $self = bless { - oneshot => 1, - 0 => *STDIN{GLOB}, - 1 => *STDOUT{GLOB}, - 2 => *STDERR{GLOB}, - env => \%ENV - }, __PACKAGE__; - dispatch($self, @ARGV); - x_it($self, $self->{child_error}) if $self->{child_error}; -} - # ensures stdout hits the FS before sock disconnects so a client # can immediately reread it sub DESTROY { my ($self) = @_; + if (my $counters = delete $self->{counters}) { + for my $k (sort keys %$counters) { + my $nr = $counters->{$k}; + $self->child_error(0, "$nr $k messages"); + } + } $self->{1}->autoflush(1) if $self->{1}; stop_pager($self); - my $err = $?; - my $oneshot_pids = delete $self->{"pid.$self.$$"} or return; - waitpid($_, 0) for keys %$oneshot_pids; - $? = $err if $err; # preserve ->fail or ->x_it code + dump_and_clear_log(); + # preserve $? for ->fail or ->x_it code +} + +sub wq_done_wait { # dwaitpid callback + my ($arg, $pid) = @_; + my ($wq, $lei) = @$arg; + local $current_lei = $lei; + my $err_type = $lei->{-err_type}; + $? and $lei->child_error($?, + $err_type ? "$err_type errors during $lei->{cmd}" : ()); + $lei->dclose; +} + +sub fchdir { + my ($lei) = @_; + my $dh = $lei->{3} // die 'BUG: lei->{3} (CWD) gone'; + chdir($dh) || die "fchdir: $!"; +} + +sub wq_eof { # EOF callback for main daemon + my ($lei) = @_; + local $current_lei = $lei; + delete $lei->{wq1} // return $lei->fail; # already failed +} + +sub watch_state_ok ($) { + my ($state) = $_[-1]; # $_[0] may be $self + $state =~ /\Apause|(?:import|index|tag)-(?:ro|rw)\z/; +} + +sub cancel_maildir_watch ($$) { + my ($d, $cfg_f) = @_; + my $w = delete $MDIR2CFGPATH->{$d}->{$cfg_f}; + scalar(keys %{$MDIR2CFGPATH->{$d}}) or + delete $MDIR2CFGPATH->{$d}; + for my $x (@{$w // []}) { $x->cancel } +} + +sub add_maildir_watch ($$) { + my ($d, $cfg_f) = @_; + if (!exists($MDIR2CFGPATH->{$d}->{$cfg_f})) { + my @w = $dir_idle->add_watches(["$d/cur", "$d/new"], 1); + push @{$MDIR2CFGPATH->{$d}->{$cfg_f}}, @w if @w; + } +} + +sub refresh_watches { + my ($lei) = @_; + $dir_idle or return; + my $cfg = _lei_cfg($lei) or return; + my $old = $cfg->{-watches}; + my $watches = $cfg->{-watches} //= {}; + my %seen; + my $cfg_f = $cfg->{'-f'}; + for my $w (grep(/\Awatch\..+\.state\z/, keys %$cfg)) { + my $url = substr($w, length('watch.'), -length('.state')); + require PublicInbox::LeiWatch; + $watches->{$url} //= PublicInbox::LeiWatch->new($url); + $seen{$url} = undef; + my $state = $cfg->get_1("watch.$url.state"); + if (!watch_state_ok($state)) { + warn("watch.$url.state=$state not supported\n"); + next; + } + if ($url =~ /\Amaildir:(.+)/i) { + my $d = canonpath_harder($1); + if ($state eq 'pause') { + cancel_maildir_watch($d, $cfg_f); + } else { + add_maildir_watch($d, $cfg_f); + } + } else { # TODO: imap/nntp/jmap + $lei->child_error(0, "E: watch $url not supported, yet") + } + } + + # add all known Maildir folders as implicit watches + my $lms = $lei->lms; + if ($lms) { + $lms->lms_write_prepare; + for my $d ($lms->folders('maildir:')) { + substr($d, 0, length('maildir:')) = ''; + + # fixup old bugs while we're iterating: + my $cd = canonpath_harder($d); + my $f = "maildir:$cd"; + $lms->rename_folder("maildir:$d", $f) if $d ne $cd; + next if $watches->{$f}; # may be set to pause + require PublicInbox::LeiWatch; + $watches->{$f} = PublicInbox::LeiWatch->new($f); + $seen{$f} = undef; + add_maildir_watch($cd, $cfg_f); + } + } + if ($old) { # cull old non-existent entries + for my $url (keys %$old) { + next if exists $seen{$url}; + delete $old->{$url}; + if ($url =~ /\Amaildir:(.+)/i) { + my $d = canonpath_harder($1); + cancel_maildir_watch($d, $cfg_f); + } else { # TODO: imap/nntp/jmap + $lei->child_error(0, "E: watch $url TODO"); + } + } + } + if (scalar keys %$watches) { + $cfg->{-env} //= { %{$lei->{env}}, PWD => '/' }; # for cfg2lei + } else { + delete $cfg->{-watches}; + } +} + +# TODO: support SHA-256 +sub git_oid { + my $eml = $_[-1]; + $eml->header_set($_) for @PublicInbox::Import::UNWANTED_HEADERS; + git_sha(1, $eml); +} + +sub lms { + my ($lei, $creat) = @_; + my $sto = $lei->{sto} // _lei_store($lei) // return; + require PublicInbox::LeiMailSync; + my $f = "$sto->{priv_eidx}->{topdir}/mail_sync.sqlite3"; + (-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef; +} + +sub sto_done_request { + my ($lei, $sock) = @_; + local $current_lei = $lei; + eval { + if ($sock //= $lei->{sock}) { # issue, async wait + $lei->{sto}->wq_io_do('done', [ $sock ]); + } else { # forcibly wait + my $wait = $lei->{sto}->wq_do('done'); + } + }; + warn($@) if $@; +} + +sub cfg_dump ($$) { + my ($lei, $f) = @_; + my $ret = eval { PublicInbox::Config->git_config_dump($f, $lei->{2}) }; + return $ret if !$@; + warn($@); + undef; +} + +sub request_umask { + my ($lei) = @_; + my $s = $lei->{sock} // return; + send($s, 'umask', MSG_EOR) // die "send: $!"; + vec(my $rvec = '', fileno($s), 1) = 1; + select($rvec, undef, undef, 2) or die 'timeout waiting for umask'; + recv($s, my $v, 5, 0) // die "recv: $!"; + (my $u, $lei->{client_umask}) = unpack('AV', $v); + $u eq 'u' or warn "E: recv $v has no umask"; } 1;