]> Sergey Matveev's repositories - public-inbox.git/commitdiff
lei: FD-passing and IPC basics
authorEric Wong <e@80x24.org>
Sun, 13 Dec 2020 22:38:48 +0000 (22:38 +0000)
committerEric Wong <e@80x24.org>
Sat, 19 Dec 2020 09:32:08 +0000 (09:32 +0000)
The start of lei, a Local Email Interface.  It'll support a
daemon via FD passing to avoid startup time penalties if
IO::FDPass is installed, but fall back to a slow one-shot mode
if not.

Compared to traditional socket daemon, FD passing should allow
us to eventually do stuff like run "git show" and still have
proper terminal support for pager and color.

MANIFEST
lib/PublicInbox/Daemon.pm
lib/PublicInbox/LeiDaemon.pm [new file with mode: 0644]
script/lei [new file with mode: 0755]
t/lei.t [new file with mode: 0644]

index ac4426065d914c6882840f6ca1b7fd1333e3d21d..7536b7c28fa6a219488e46b2483144713a43bbb4 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -159,6 +159,7 @@ lib/PublicInbox/InboxIdle.pm
 lib/PublicInbox/InboxWritable.pm
 lib/PublicInbox/Isearch.pm
 lib/PublicInbox/KQNotify.pm
+lib/PublicInbox/LeiDaemon.pm
 lib/PublicInbox/Linkify.pm
 lib/PublicInbox/Listener.pm
 lib/PublicInbox/Lock.pm
@@ -226,6 +227,7 @@ sa_config/Makefile
 sa_config/README
 sa_config/root/etc/spamassassin/public-inbox.pre
 sa_config/user/.spamassassin/user_prefs
+script/lei
 script/public-inbox-compact
 script/public-inbox-convert
 script/public-inbox-edit
@@ -316,6 +318,7 @@ t/indexlevels-mirror.t
 t/init.t
 t/iso-2202-jp.eml
 t/kqnotify.t
+t/lei.t
 t/linkify.t
 t/main-bin/spamc
 t/mda-mime.eml
index a2171535c9ae2a3656c498318f6fa534ecd48691..6b92b60dac8ee7e290279c7261717ccab440596f 100644 (file)
@@ -1,7 +1,9 @@
 # Copyright (C) 2015-2020 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# contains common daemon code for the httpd, imapd, and nntpd servers.
-# This may be used for read-only IMAP server if we decide to implement it.
+#
+# Contains common daemon code for the httpd, imapd, and nntpd servers
+# and designed for handling thousands of untrusted clients over slow
+# and/or lossy connections.
 package PublicInbox::Daemon;
 use strict;
 use warnings;
diff --git a/lib/PublicInbox/LeiDaemon.pm b/lib/PublicInbox/LeiDaemon.pm
new file mode 100644 (file)
index 0000000..ae40b3a
--- /dev/null
@@ -0,0 +1,303 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Backend for `lei' (local email interface).  Unlike the C10K-oriented
+# PublicInbox::Daemon, this is designed exclusively to handle trusted
+# local clients with read/write access to the FS and use as many
+# system resources as the local user has access to.
+package PublicInbox::LeiDaemon;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use Getopt::Long ();
+use Errno qw(EAGAIN ECONNREFUSED ENOENT);
+use POSIX qw(setsid);
+use IO::Socket::UNIX;
+use IO::Handle ();
+use Sys::Syslog qw(syslog openlog);
+use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
+use PublicInbox::Sigfd;
+use PublicInbox::DS qw(now);
+use PublicInbox::Spawn qw(spawn);
+our $quit = sub { exit(shift // 0) };
+my $glp = Getopt::Long::Parser->new;
+$glp->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
+
+sub x_it ($$) { # pronounced "exit"
+       my ($client, $code) = @_;
+       if (my $sig = ($code & 127)) {
+               kill($sig, $client->{pid} // $$);
+       } else {
+               $code >>= 8;
+               if (my $sock = $client->{sock}) {
+                       say $sock "exit=$code";
+               } else { # for oneshot
+                       $quit->($code);
+               }
+       }
+}
+
+sub emit ($$$) {
+       my ($client, $channel, $buf) = @_;
+       print { $client->{$channel} } $buf or warn "print FD[$channel]: $!";
+}
+
+sub fail ($$;$) {
+       my ($client, $buf, $exit_code) = @_;
+       $buf .= "\n" unless $buf =~ /\n\z/s;
+       emit($client, 2, $buf);
+       x_it($client, ($exit_code // 1) << 8);
+       undef;
+}
+
+sub _help ($;$) {
+       my ($client, $channel) = @_;
+       emit($client, $channel //= 1, <<EOF);
+usage: lei COMMAND [OPTIONS]
+
+...
+EOF
+       x_it($client, $channel == 2 ? 1 << 8 : 0); # stderr => failure
+}
+
+sub assert_args ($$$;$@) {
+       my ($client, $argv, $proto, $opt, @spec) = @_;
+       $opt //= {};
+       push @spec, qw(help|h);
+       $glp->getoptionsfromarray($argv, $opt, @spec) or
+               return fail($client, 'bad arguments or options');
+       if ($opt->{help}) {
+               _help($client);
+               undef;
+       } else {
+               my ($nreq, $rest) = split(/;/, $proto);
+               $nreq = (($nreq // '') =~ tr/$/$/);
+               my $argc = scalar(@$argv);
+               my $tot = ($rest // '') eq '@' ? $argc : ($proto =~ tr/$/$/);
+               return 1 if $argc <= $tot && $argc >= $nreq;
+               _help($client, 2);
+               undef
+       }
+}
+
+sub dispatch {
+       my ($client, $cmd, @argv) = @_;
+       local $SIG{__WARN__} = sub { emit($client, 2, "@_") };
+       local $SIG{__DIE__} = 'DEFAULT';
+       if (defined $cmd) {
+               my $func = "lei_$cmd";
+               $func =~ tr/-/_/;
+               if (my $cb = __PACKAGE__->can($func)) {
+                       $client->{cmd} = $cmd;
+                       $cb->($client, \@argv);
+               } elsif (grep(/\A-/, $cmd, @argv)) {
+                       assert_args($client, [ $cmd, @argv ], '');
+               } else {
+                       fail($client, "`$cmd' is not an lei command");
+               }
+       } else {
+               _help($client, 2);
+       }
+}
+
+sub lei_daemon_pid {
+       my ($client, $argv) = @_;
+       assert_args($client, $argv, '') and emit($client, 1, "$$\n");
+}
+
+sub lei_DBG_pwd {
+       my ($client, $argv) = @_;
+       assert_args($client, $argv, '') and
+               emit($client, 1, "$client->{env}->{PWD}\n");
+}
+
+sub lei_DBG_cwd {
+       my ($client, $argv) = @_;
+       require Cwd;
+       assert_args($client, $argv, '') and emit($client, 1, Cwd::cwd()."\n");
+}
+
+sub lei_DBG_false { x_it($_[0], 1 << 8) }
+
+sub lei_daemon_stop {
+       my ($client, $argv) = @_;
+       assert_args($client, $argv, '') and $quit->(0);
+}
+
+sub lei_help { _help($_[0]) }
+
+sub reap_exec { # dwaitpid callback
+       my ($client, $pid) = @_;
+       x_it($client, $?);
+}
+
+sub lei_git { # support passing through random git commands
+       my ($client, $argv) = @_;
+       my %opt = map { $_ => $client->{$_} } (0..2);
+       my $pid = spawn(['git', @$argv], $client->{env}, \%opt);
+       PublicInbox::DS::dwaitpid($pid, \&reap_exec, $client);
+}
+
+sub accept_dispatch { # Listener {post_accept} callback
+       my ($sock) = @_; # ignore other
+       $sock->blocking(1);
+       $sock->autoflush(1);
+       my $client = { sock => $sock };
+       vec(my $rin = '', fileno($sock), 1) = 1;
+       # `say $sock' triggers "die" in lei(1)
+       for my $i (0..2) {
+               if (select(my $rout = $rin, undef, undef, 1)) {
+                       my $fd = IO::FDPass::recv(fileno($sock));
+                       if ($fd >= 0) {
+                               my $rdr = ($fd == 0 ? '<&=' : '>&=');
+                               if (open(my $fh, $rdr, $fd)) {
+                                       $client->{$i} = $fh;
+                               } else {
+                                       say $sock "open($rdr$fd) (FD=$i): $!";
+                                       return;
+                               }
+                       } else {
+                               say $sock "recv FD=$i: $!";
+                               return;
+                       }
+               } else {
+                       say $sock "timed out waiting to recv FD=$i";
+                       return;
+               }
+       }
+       # $ARGV_STR = join("]\0[", @ARGV);
+       # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV);
+       # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0";
+       my ($client_pid, $argv, $env) = do {
+               local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2
+               chomp(my $line = <$sock>);
+               split(/\0\0>/, $line, 3);
+       };
+       my %env = map { split(/=/, $_, 2) } split(/\0/, $env);
+       if (chdir($env{PWD})) {
+               $client->{env} = \%env;
+               $client->{pid} = $client_pid;
+               eval { dispatch($client, split(/\]\0\[/, $argv)) };
+               say $sock $@ if $@;
+       } else {
+               say $sock "chdir($env{PWD}): $!"; # implicit close
+       }
+}
+
+sub noop {}
+
+# lei(1) calls this when it can't connect
+sub lazy_start ($$) {
+       my ($path, $err) = @_;
+       if ($err == ECONNREFUSED) {
+               unlink($path) or die "unlink($path): $!";
+       } elsif ($err != ENOENT) {
+               die "connect($path): $!";
+       }
+       my $umask = umask(077) // die("umask(077): $!");
+       my $l = IO::Socket::UNIX->new(Local => $path,
+                                       Listen => 1024,
+                                       Type => SOCK_STREAM) or
+               $err = $!;
+       umask($umask) or die("umask(restore): $!");
+       $l or return $err;
+       my @st = stat($path) or die "stat($path): $!";
+       my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
+       pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
+       my $oldset = PublicInbox::Sigfd::block_signals();
+       my $pid = fork // die "fork: $!";
+       if ($pid) {
+               PublicInbox::Sigfd::sig_setmask($oldset);
+               return; # client will connect to $path
+       }
+       openlog($path, 'pid', 'user');
+       local $SIG{__DIE__} = sub {
+               syslog('crit', "@_");
+               exit $! if $!;
+               exit $? >> 8 if $? >> 8;
+               exit 255;
+       };
+       local $SIG{__WARN__} = sub { syslog('warning', "@_") };
+       open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!\n";
+       open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n";
+       open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n";
+       setsid();
+       $pid = fork // die "fork: $!";
+       exit if $pid;
+       $0 = "lei-daemon $path";
+       require PublicInbox::Listener;
+       require PublicInbox::EOFpipe;
+       $l->blocking(0);
+       $eof_w->blocking(0);
+       $eof_r->blocking(0);
+       my $listener = PublicInbox::Listener->new($l, \&accept_dispatch, $l);
+       my $exit_code;
+       local $quit = sub {
+               $exit_code //= shift;
+               my $tmp = $listener or exit($exit_code);
+               unlink($path) if defined($path);
+               syswrite($eof_w, '.');
+               $l = $listener = $path = undef;
+               $tmp->close if $tmp; # DS::close
+               PublicInbox::DS->SetLoopTimeout(1000);
+       };
+       PublicInbox::EOFpipe->new($eof_r, sub {}, undef);
+       my $sig = {
+               CHLD => \&PublicInbox::DS::enqueue_reap,
+               QUIT => $quit,
+               INT => $quit,
+               TERM => $quit,
+               HUP => \&noop,
+               USR1 => \&noop,
+               USR2 => \&noop,
+       };
+       my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK);
+       local %SIG = (%SIG, %$sig) if !$sigfd;
+       if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
+               PublicInbox::DS->SetLoopTimeout(5000);
+       } else {
+               # wake up every second to accept signals if we don't
+               # have signalfd or IO::KQueue:
+               PublicInbox::Sigfd::sig_setmask($oldset);
+               PublicInbox::DS->SetLoopTimeout(1000);
+       }
+       PublicInbox::DS->SetPostLoopCallback(sub {
+               my ($dmap, undef) = @_;
+               if (@st = defined($path) ? stat($path) : ()) {
+                       if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
+                               warn "$path dev/ino changed, quitting\n";
+                               $path = undef;
+                       }
+               } elsif (defined($path)) {
+                       warn "stat($path): $!, quitting ...\n";
+                       undef $path; # don't unlink
+                       $quit->();
+               }
+               return 1 if defined($path);
+               my $now = now();
+               my $n = 0;
+               for my $s (values %$dmap) {
+                       $s->can('busy') or next;
+                       if ($s->busy($now)) {
+                               ++$n;
+                       } else {
+                               $s->close;
+                       }
+               }
+               $n; # true: continue, false: stop
+       });
+       PublicInbox::DS->EventLoop;
+       exit($exit_code // 0);
+}
+
+# for users w/o IO::FDPass
+sub oneshot {
+       dispatch({
+               0 => *STDIN{IO},
+               1 => *STDOUT{IO},
+               2 => *STDERR{IO},
+               env => \%ENV
+       }, @ARGV);
+}
+
+1;
diff --git a/script/lei b/script/lei
new file mode 100755 (executable)
index 0000000..1b5af3a
--- /dev/null
@@ -0,0 +1,58 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Cwd qw(cwd);
+use IO::Socket::UNIX;
+
+if (eval { require IO::FDPass; 1 }) { # use daemon to reduce load time
+       my $path = do {
+               my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei';
+               if ($runtime_dir eq '/lei') {
+                       require File::Spec;
+                       $runtime_dir = File::Spec->tmpdir."/lei-$<";
+               }
+               unless (-d $runtime_dir && -w _) {
+                       require File::Path;
+                       File::Path::mkpath($runtime_dir, 0, 0700);
+               }
+               "$runtime_dir/sock";
+       };
+       my $sock = IO::Socket::UNIX->new(Peer => $path, Type => SOCK_STREAM);
+       unless ($sock) { # start the daemon if not started
+               my $err = $!;
+               require PublicInbox::LeiDaemon;
+               $err = PublicInbox::LeiDaemon::lazy_start($path, $err);
+               # try connecting again anyways, unlink+bind may be racy
+               $sock = IO::Socket::UNIX->new(Peer => $path,
+                                               Type => SOCK_STREAM) // die
+                       "connect($path): $! (bind($path): $err)";
+       }
+       my $pwd = $ENV{PWD};
+       my $cwd = cwd();
+       if ($pwd) { # prefer ENV{PWD} if it's a symlink to real cwd
+               my @st_cwd = stat($cwd) or die "stat(cwd=$cwd): $!\n";
+               my @st_pwd = stat($pwd);
+               # make sure st_dev/st_ino match for {PWD} to be valid
+               $pwd = $cwd if (!@st_pwd || $st_pwd[1] != $st_cwd[1] ||
+                                       $st_pwd[0] != $st_cwd[0]);
+       } else {
+               $pwd = $cwd;
+       }
+       local $ENV{PWD} = $pwd;
+       $sock->autoflush(1);
+       IO::FDPass::send(fileno($sock), $_) for (0..2);
+       my $buf = "$$\0\0>" . join("]\0[", @ARGV) . "\0\0>";
+       while (my ($k, $v) = each %ENV) { $buf .= "$k=$v\0" }
+       $buf .= "\0\0";
+       print $sock $buf or die "print(sock, buf): $!";
+       local $/ = "\n";
+       while (my $line = <$sock>) {
+               $line =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0);
+               die $line;
+       }
+} else { # for systems lacking IO::FDPass
+       require PublicInbox::LeiDaemon;
+       PublicInbox::LeiDaemon::oneshot();
+}
diff --git a/t/lei.t b/t/lei.t
new file mode 100644 (file)
index 0000000..feee927
--- /dev/null
+++ b/t/lei.t
@@ -0,0 +1,80 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use PublicInbox::Config;
+my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing';
+require_mods(qw(DBD::SQLite Search::Xapian));
+my ($home, $for_destroy) = tmpdir();
+my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') };
+
+SKIP: {
+       require_mods('IO::FDPass', 51);
+       local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
+       mkdir "$home/xdg_run", 0700 or BAIL_OUT "mkdir: $!";
+       my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/sock";
+
+       ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid');
+       is($err, '', 'no error from daemon-pid');
+       like($out, qr/\A[0-9]+\n\z/s, 'pid returned') or BAIL_OUT;
+       chomp(my $pid = $out);
+       ok(kill(0, $pid), 'pid is valid');
+       ok(-S $sock, 'sock created');
+
+       ok(!run_script([qw(lei)], undef, $opt), 'no args fails');
+       is($? >> 8, 1, '$? is 1');
+       is($out, '', 'nothing in stdout');
+       like($err, qr/^usage:/sm, 'usage in stderr');
+
+       for my $arg (['-h'], ['--help'], ['help'], [qw(daemon-pid --help)]) {
+               $out = $err = '';
+               ok(run_script(['lei', @$arg], undef, $opt), "lei @$arg");
+               like($out, qr/^usage:/sm, "usage in stdout (@$arg)");
+               is($err, '', "nothing in stderr (@$arg)");
+       }
+
+       ok(!run_script([qw(lei DBG-false)], undef, $opt), 'false(1) emulation');
+       is($? >> 8, 1, '$? set correctly');
+       is($err, '', 'no error from false(1) emulation');
+
+       for my $arg ([''], ['--halp'], ['halp'], [qw(daemon-pid --halp)]) {
+               $out = $err = '';
+               ok(!run_script(['lei', @$arg], undef, $opt), "lei @$arg");
+               is($? >> 8, 1, '$? set correctly');
+               isnt($err, '', 'something in stderr');
+               is($out, '', 'nothing in stdout');
+       }
+
+       $out = '';
+       ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid');
+       chomp(my $pid_again = $out);
+       is($pid, $pid_again, 'daemon-pid idempotent');
+
+       ok(run_script([qw(lei daemon-stop)], undef, $opt), 'daemon-stop');
+       is($out, '', 'no output from daemon-stop');
+       is($err, '', 'no error from daemon-stop');
+       for (0..100) {
+               kill(0, $pid) or last;
+               tick();
+       }
+       ok(!-S $sock, 'sock gone');
+       ok(!kill(0, $pid), 'pid gone after stop');
+
+       ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid');
+       chomp(my $new_pid = $out);
+       ok(kill(0, $new_pid), 'new pid is running');
+       ok(-S $sock, 'sock exists again');
+       unlink $sock or BAIL_OUT "unlink $!";
+       for (0..100) {
+               kill('CHLD', $new_pid) or last;
+               tick();
+       }
+       ok(!kill(0, $new_pid), 'daemon exits after unlink');
+};
+
+require_ok 'PublicInbox::LeiDaemon';
+
+done_testing;