# Copyright (C) 2020 all contributors # License: AGPL-3.0+ # Backend for `lei' (local email interface). Unlike the C10K-oriented # PublicInbox::Daemon, this is designed exclusively to handle trusted # local clients with read/write access to the FS and use as many # system resources as the local user has access to. package PublicInbox::LeiDaemon; use strict; use v5.10.1; use parent qw(PublicInbox::DS); use Getopt::Long (); use Errno qw(EAGAIN ECONNREFUSED ENOENT); use POSIX qw(setsid); use IO::Socket::UNIX; use IO::Handle (); use Sys::Syslog qw(syslog openlog); use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT); use PublicInbox::Sigfd; use PublicInbox::DS qw(now); use PublicInbox::Spawn qw(spawn); our $quit = sub { exit(shift // 0) }; my $glp = Getopt::Long::Parser->new; $glp->configure(qw(gnu_getopt no_ignore_case auto_abbrev)); # TBD: this is a documentation mechanism to show a subcommand # (may) pass options through to another command: sub pass_through { () } # TODO: generate shell completion + help using %CMD and %OPTDESC # command => [ positional_args, 1-line description, Getopt::Long option spec ] our %CMD = ( # sorted in order of importance/use: 'query' => [ 'SEARCH-TERMS...', 'search for messages matching terms', qw( save-as=s output|o=s format|f=s dedupe|d=s thread|t augment|a limit|n=i sort|s=s reverse|r offset=i remote local! extinbox! since|after=s until|before=s) ], 'show' => [ '{MID|OID}', 'show a given object (Message-ID or object ID)', qw(type=s solve! format|f=s dedupe|d=s thread|t remote local!), pass_through('git show') ], 'add-extinbox' => [ 'URL-OR-PATHNAME', 'add/set priority of a publicinbox|extindex for extra matches', qw(prio=i) ], 'ls-extinbox' => [ '[FILTER]', 'list publicinbox|extindex locations', qw(format|f=s z local remote) ], 'forget-extinbox' => [ '{URL-OR-PATHNAME|--prune}', 'exclude further results from a publicinbox|extindex', qw(prune) ], 'ls-query' => [ '[FILTER]', 'list saved search queries', qw(name-only format|f=s z) ], 'rm-query' => [ 'QUERY_NAME', 'remove a saved search' ], 'mv-query' => [ qw(OLD_NAME NEW_NAME), 'rename a saved search' ], 'plonk' => [ '{--thread|--from=IDENT}', 'exclude mail matching From: or thread from non-Message-ID searches', qw(thread|t from|f=s mid=s oid=s) ], 'mark' => [ 'MESSAGE-FLAGS', 'set/unset flags on message(s) from stdin', qw(stdin| oid=s exact by-mid|mid:s) ], 'forget' => [ '--stdin', 'exclude message(s) on stdin from query results', qw(stdin| oid=s exact by-mid|mid:s) ], 'purge-mailsource' => [ '{URL-OR-PATHNAME|--all}', 'remove imported messages from IMAP, Maildirs, and MH', qw(exact! all jobs:i indexed) ], # code repos are used for `show' to solve blobs from patch mails 'add-coderepo' => [ 'PATHNAME', 'add or set priority of a git code repo', qw(prio=i) ], 'ls-coderepo' => [ '[FILTER]', 'list known code repos', qw(format|f=s z) ], 'forget-coderepo' => [ 'PATHNAME', 'stop using repo to solve blobs from patches', qw(prune) ], 'add-watch' => [ '[URL_OR_PATHNAME]', 'watch for new messages and flag changes', qw(import! flags! interval=s recursive|r exclude=s include=s) ], 'ls-watch' => [ '[FILTER]', 'list active watches with numbers and status', qw(format|f=s z) ], 'pause-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote) ], 'resume-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote) ], 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch', qw(prune) ], 'import' => [ '{URL_OR_PATHNAME|--stdin}', 'one-shot import/update from URL or filesystem', qw(stdin| limit|n=i offset=i recursive|r exclude=s include=s !flags), ], 'config' => [ '[ANYTHING...]', 'git-config(1) wrapper for ~/.config/lei/config', pass_through('git config') ], 'daemon-stop' => [ undef, 'stop the lei-daemon' ], 'daemon-pid' => [ undef, 'show the PID of the lei-daemon' ], 'help' => [ '[SUBCOMMAND]', 'show help' ], # XXX do we need this? # 'git' => [ '[ANYTHING...]', 'git(1) wrapper', pass_through('git') ], 'reorder-local-store-and-break-history' => [ '[REFNAME]', 'rewrite git history in an attempt to improve compression', 'gc!' ] ); # @CMD # switch descriptions, try to keep consistent across commands # $spec: Getopt::Long option specification # $spec => [@ALLOWED_VALUES (default is first), $description], # $spec => $description # "$SUB_COMMAND TAB $spec" => as above my $stdin_formats = [ qw(auto raw mboxrd mboxcl2 mboxcl mboxo), 'specify message input format' ]; my $ls_format = [ qw(plain json null), 'listing output format' ]; my $show_format = [ qw(plain raw html mboxrd mboxcl2 mboxcl), 'message/object output format' ]; my %OPTDESC = ( 'solve!' => 'do not attempt to reconstruct blobs from emails', 'save-as=s' => 'save a search terms by given name', 'type=s' => [qw(any mid git), 'disambiguate type' ], 'dedupe|d=s' => [qw(content oid mid), 'deduplication strategy'], 'thread|t' => 'every message in the same thread as the actual match(es)', 'augment|a' => 'augment --output destination instead of clobbering', 'output|o=s' => "destination (e.g. `/path/to/Maildir', or `-' for stdout)", 'mark format|f=s' => $stdin_formats, 'forget format|f=s' => $stdin_formats, 'query format|f=s' => [qw(maildir mboxrd mboxcl2 mboxcl html oid), q[specify output format (default: determined by --output)]], 'ls-query format|f=s' => $ls_format, 'ls-extinbox format|f=s' => $ls_format, 'limit|n=i' => 'integer limit on number of matches (default: 10000)', 'offset=i' => 'search result offset (default: 0)', 'sort|s=s@' => [qw(internaldate date relevance docid), "order of results `--output'-dependent)"], 'prio=i' => 'priority of query source', 'local' => 'limit operations to the local filesystem', 'local!' => 'exclude results from the local filesystem', 'remote' => 'limit operations to those requiring network access', 'remote!' => 'prevent operations requiring network access', 'mid=s' => 'specify the Message-ID of a message', 'oid=s' => 'specify the git object ID of a message', 'recursive|r' => 'scan directories/mailboxes/newsgroups recursively', 'exclude=s' => 'exclude mailboxes/newsgroups based on pattern', 'include=s' => 'include mailboxes/newsgroups based on pattern', 'exact' => 'operate on exact header matches only', 'exact!' => 'rely on content match instead of exact header matches', 'by-mid|mid:s' => 'match only by Message-ID, ignoring contents', 'jobs:i' => 'set parallelism level', ); # %OPTDESC sub x_it ($$) { # pronounced "exit" my ($client, $code) = @_; if (my $sig = ($code & 127)) { kill($sig, $client->{pid} // $$); } else { $code >>= 8; if (my $sock = $client->{sock}) { say $sock "exit=$code"; } else { # for oneshot $quit->($code); } } } sub emit ($$$) { my ($client, $channel, $buf) = @_; print { $client->{$channel} } $buf or warn "print FD[$channel]: $!"; } sub fail ($$;$) { my ($client, $buf, $exit_code) = @_; $buf .= "\n" unless $buf =~ /\n\z/s; emit($client, 2, $buf); x_it($client, ($exit_code // 1) << 8); undef; } sub _help ($;$) { my ($client, $channel) = @_; emit($client, $channel //= 1, < failure } sub assert_args ($$$;$@) { my ($client, $argv, $proto, $opt, @spec) = @_; $opt //= {}; push @spec, qw(help|h); $glp->getoptionsfromarray($argv, $opt, @spec) or return fail($client, 'bad arguments or options'); if ($opt->{help}) { _help($client); undef; } else { my ($nreq, $rest) = split(/;/, $proto); $nreq = (($nreq // '') =~ tr/$/$/); my $argc = scalar(@$argv); my $tot = ($rest // '') eq '@' ? $argc : ($proto =~ tr/$/$/); return 1 if $argc <= $tot && $argc >= $nreq; _help($client, 2); undef } } sub dispatch { my ($client, $cmd, @argv) = @_; local $SIG{__WARN__} = sub { emit($client, 2, "@_") }; local $SIG{__DIE__} = 'DEFAULT'; if (defined $cmd) { my $func = "lei_$cmd"; $func =~ tr/-/_/; if (my $cb = __PACKAGE__->can($func)) { $client->{cmd} = $cmd; $cb->($client, \@argv); } elsif (grep(/\A-/, $cmd, @argv)) { assert_args($client, [ $cmd, @argv ], ''); } else { fail($client, "`$cmd' is not an lei command"); } } else { _help($client, 2); } } sub lei_daemon_pid { my ($client, $argv) = @_; assert_args($client, $argv, '') and emit($client, 1, "$$\n"); } sub lei_DBG_pwd { my ($client, $argv) = @_; assert_args($client, $argv, '') and emit($client, 1, "$client->{env}->{PWD}\n"); } sub lei_DBG_cwd { my ($client, $argv) = @_; require Cwd; assert_args($client, $argv, '') and emit($client, 1, Cwd::cwd()."\n"); } sub lei_DBG_false { x_it($_[0], 1 << 8) } sub lei_daemon_stop { my ($client, $argv) = @_; assert_args($client, $argv, '') and $quit->(0); } sub lei_help { _help($_[0]) } sub reap_exec { # dwaitpid callback my ($client, $pid) = @_; x_it($client, $?); } sub lei_git { # support passing through random git commands my ($client, $argv) = @_; my %opt = map { $_ => $client->{$_} } (0..2); my $pid = spawn(['git', @$argv], $client->{env}, \%opt); PublicInbox::DS::dwaitpid($pid, \&reap_exec, $client); } sub accept_dispatch { # Listener {post_accept} callback my ($sock) = @_; # ignore other $sock->blocking(1); $sock->autoflush(1); my $client = { sock => $sock }; vec(my $rin = '', fileno($sock), 1) = 1; # `say $sock' triggers "die" in lei(1) for my $i (0..2) { if (select(my $rout = $rin, undef, undef, 1)) { my $fd = IO::FDPass::recv(fileno($sock)); if ($fd >= 0) { my $rdr = ($fd == 0 ? '<&=' : '>&='); if (open(my $fh, $rdr, $fd)) { $client->{$i} = $fh; } else { say $sock "open($rdr$fd) (FD=$i): $!"; return; } } else { say $sock "recv FD=$i: $!"; return; } } else { say $sock "timed out waiting to recv FD=$i"; return; } } # $ARGV_STR = join("]\0[", @ARGV); # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV); # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0"; my ($client_pid, $argv, $env) = do { local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2 chomp(my $line = <$sock>); split(/\0\0>/, $line, 3); }; my %env = map { split(/=/, $_, 2) } split(/\0/, $env); if (chdir($env{PWD})) { $client->{env} = \%env; $client->{pid} = $client_pid; eval { dispatch($client, split(/\]\0\[/, $argv)) }; say $sock $@ if $@; } else { say $sock "chdir($env{PWD}): $!"; # implicit close } } sub noop {} # lei(1) calls this when it can't connect sub lazy_start { my ($path, $err) = @_; if ($err == ECONNREFUSED) { unlink($path) or die "unlink($path): $!"; } elsif ($err != ENOENT) { die "connect($path): $!"; } require IO::FDPass; my $umask = umask(077) // die("umask(077): $!"); my $l = IO::Socket::UNIX->new(Local => $path, Listen => 1024, Type => SOCK_STREAM) or $err = $!; umask($umask) or die("umask(restore): $!"); $l or return die "bind($path): $err"; my @st = stat($path) or die "stat($path): $!"; my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; my $oldset = PublicInbox::Sigfd::block_signals(); my $pid = fork // die "fork: $!"; return if $pid; openlog($path, 'pid', 'user'); local $SIG{__DIE__} = sub { syslog('crit', "@_"); exit $! if $!; exit $? >> 8 if $? >> 8; exit 255; }; local $SIG{__WARN__} = sub { syslog('warning', "@_") }; open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!\n"; open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n"; open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n"; setsid(); $pid = fork // die "fork: $!"; return if $pid; $0 = "lei-daemon $path"; require PublicInbox::Listener; require PublicInbox::EOFpipe; $l->blocking(0); $eof_w->blocking(0); $eof_r->blocking(0); my $listener = PublicInbox::Listener->new($l, \&accept_dispatch, $l); my $exit_code; local $quit = sub { $exit_code //= shift; my $tmp = $listener or exit($exit_code); unlink($path) if defined($path); syswrite($eof_w, '.'); $l = $listener = $path = undef; $tmp->close if $tmp; # DS::close PublicInbox::DS->SetLoopTimeout(1000); }; PublicInbox::EOFpipe->new($eof_r, sub {}, undef); my $sig = { CHLD => \&PublicInbox::DS::enqueue_reap, QUIT => $quit, INT => $quit, TERM => $quit, HUP => \&noop, USR1 => \&noop, USR2 => \&noop, }; my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK); local %SIG = (%SIG, %$sig) if !$sigfd; if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets PublicInbox::DS->SetLoopTimeout(5000); } else { # wake up every second to accept signals if we don't # have signalfd or IO::KQueue: PublicInbox::Sigfd::sig_setmask($oldset); PublicInbox::DS->SetLoopTimeout(1000); } PublicInbox::DS->SetPostLoopCallback(sub { my ($dmap, undef) = @_; if (@st = defined($path) ? stat($path) : ()) { if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) { warn "$path dev/ino changed, quitting\n"; $path = undef; } } elsif (defined($path)) { warn "stat($path): $!, quitting ...\n"; undef $path; # don't unlink $quit->(); } return 1 if defined($path); my $now = now(); my $n = 0; for my $s (values %$dmap) { $s->can('busy') or next; if ($s->busy($now)) { ++$n; } else { $s->close; } } $n; # true: continue, false: stop }); PublicInbox::DS->EventLoop; exit($exit_code // 0); } # for users w/o IO::FDPass sub oneshot { dispatch({ 0 => *STDIN{IO}, 1 => *STDOUT{IO}, 2 => *STDERR{IO}, env => \%ENV }, @ARGV); } 1;