]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/LeiDaemon.pm
ae40b3a66510640bba1d978ddb2175be47c2d976
[public-inbox.git] / lib / PublicInbox / LeiDaemon.pm
1 # Copyright (C) 2020 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # Backend for `lei' (local email interface).  Unlike the C10K-oriented
5 # PublicInbox::Daemon, this is designed exclusively to handle trusted
6 # local clients with read/write access to the FS and use as many
7 # system resources as the local user has access to.
8 package PublicInbox::LeiDaemon;
9 use strict;
10 use v5.10.1;
11 use parent qw(PublicInbox::DS);
12 use Getopt::Long ();
13 use Errno qw(EAGAIN ECONNREFUSED ENOENT);
14 use POSIX qw(setsid);
15 use IO::Socket::UNIX;
16 use IO::Handle ();
17 use Sys::Syslog qw(syslog openlog);
18 use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
19 use PublicInbox::Sigfd;
20 use PublicInbox::DS qw(now);
21 use PublicInbox::Spawn qw(spawn);
22 our $quit = sub { exit(shift // 0) };
23 my $glp = Getopt::Long::Parser->new;
24 $glp->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
25
26 sub x_it ($$) { # pronounced "exit"
27         my ($client, $code) = @_;
28         if (my $sig = ($code & 127)) {
29                 kill($sig, $client->{pid} // $$);
30         } else {
31                 $code >>= 8;
32                 if (my $sock = $client->{sock}) {
33                         say $sock "exit=$code";
34                 } else { # for oneshot
35                         $quit->($code);
36                 }
37         }
38 }
39
40 sub emit ($$$) {
41         my ($client, $channel, $buf) = @_;
42         print { $client->{$channel} } $buf or warn "print FD[$channel]: $!";
43 }
44
45 sub fail ($$;$) {
46         my ($client, $buf, $exit_code) = @_;
47         $buf .= "\n" unless $buf =~ /\n\z/s;
48         emit($client, 2, $buf);
49         x_it($client, ($exit_code // 1) << 8);
50         undef;
51 }
52
53 sub _help ($;$) {
54         my ($client, $channel) = @_;
55         emit($client, $channel //= 1, <<EOF);
56 usage: lei COMMAND [OPTIONS]
57
58 ...
59 EOF
60         x_it($client, $channel == 2 ? 1 << 8 : 0); # stderr => failure
61 }
62
63 sub assert_args ($$$;$@) {
64         my ($client, $argv, $proto, $opt, @spec) = @_;
65         $opt //= {};
66         push @spec, qw(help|h);
67         $glp->getoptionsfromarray($argv, $opt, @spec) or
68                 return fail($client, 'bad arguments or options');
69         if ($opt->{help}) {
70                 _help($client);
71                 undef;
72         } else {
73                 my ($nreq, $rest) = split(/;/, $proto);
74                 $nreq = (($nreq // '') =~ tr/$/$/);
75                 my $argc = scalar(@$argv);
76                 my $tot = ($rest // '') eq '@' ? $argc : ($proto =~ tr/$/$/);
77                 return 1 if $argc <= $tot && $argc >= $nreq;
78                 _help($client, 2);
79                 undef
80         }
81 }
82
83 sub dispatch {
84         my ($client, $cmd, @argv) = @_;
85         local $SIG{__WARN__} = sub { emit($client, 2, "@_") };
86         local $SIG{__DIE__} = 'DEFAULT';
87         if (defined $cmd) {
88                 my $func = "lei_$cmd";
89                 $func =~ tr/-/_/;
90                 if (my $cb = __PACKAGE__->can($func)) {
91                         $client->{cmd} = $cmd;
92                         $cb->($client, \@argv);
93                 } elsif (grep(/\A-/, $cmd, @argv)) {
94                         assert_args($client, [ $cmd, @argv ], '');
95                 } else {
96                         fail($client, "`$cmd' is not an lei command");
97                 }
98         } else {
99                 _help($client, 2);
100         }
101 }
102
103 sub lei_daemon_pid {
104         my ($client, $argv) = @_;
105         assert_args($client, $argv, '') and emit($client, 1, "$$\n");
106 }
107
108 sub lei_DBG_pwd {
109         my ($client, $argv) = @_;
110         assert_args($client, $argv, '') and
111                 emit($client, 1, "$client->{env}->{PWD}\n");
112 }
113
114 sub lei_DBG_cwd {
115         my ($client, $argv) = @_;
116         require Cwd;
117         assert_args($client, $argv, '') and emit($client, 1, Cwd::cwd()."\n");
118 }
119
120 sub lei_DBG_false { x_it($_[0], 1 << 8) }
121
122 sub lei_daemon_stop {
123         my ($client, $argv) = @_;
124         assert_args($client, $argv, '') and $quit->(0);
125 }
126
127 sub lei_help { _help($_[0]) }
128
129 sub reap_exec { # dwaitpid callback
130         my ($client, $pid) = @_;
131         x_it($client, $?);
132 }
133
134 sub lei_git { # support passing through random git commands
135         my ($client, $argv) = @_;
136         my %opt = map { $_ => $client->{$_} } (0..2);
137         my $pid = spawn(['git', @$argv], $client->{env}, \%opt);
138         PublicInbox::DS::dwaitpid($pid, \&reap_exec, $client);
139 }
140
141 sub accept_dispatch { # Listener {post_accept} callback
142         my ($sock) = @_; # ignore other
143         $sock->blocking(1);
144         $sock->autoflush(1);
145         my $client = { sock => $sock };
146         vec(my $rin = '', fileno($sock), 1) = 1;
147         # `say $sock' triggers "die" in lei(1)
148         for my $i (0..2) {
149                 if (select(my $rout = $rin, undef, undef, 1)) {
150                         my $fd = IO::FDPass::recv(fileno($sock));
151                         if ($fd >= 0) {
152                                 my $rdr = ($fd == 0 ? '<&=' : '>&=');
153                                 if (open(my $fh, $rdr, $fd)) {
154                                         $client->{$i} = $fh;
155                                 } else {
156                                         say $sock "open($rdr$fd) (FD=$i): $!";
157                                         return;
158                                 }
159                         } else {
160                                 say $sock "recv FD=$i: $!";
161                                 return;
162                         }
163                 } else {
164                         say $sock "timed out waiting to recv FD=$i";
165                         return;
166                 }
167         }
168         # $ARGV_STR = join("]\0[", @ARGV);
169         # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV);
170         # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0";
171         my ($client_pid, $argv, $env) = do {
172                 local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2
173                 chomp(my $line = <$sock>);
174                 split(/\0\0>/, $line, 3);
175         };
176         my %env = map { split(/=/, $_, 2) } split(/\0/, $env);
177         if (chdir($env{PWD})) {
178                 $client->{env} = \%env;
179                 $client->{pid} = $client_pid;
180                 eval { dispatch($client, split(/\]\0\[/, $argv)) };
181                 say $sock $@ if $@;
182         } else {
183                 say $sock "chdir($env{PWD}): $!"; # implicit close
184         }
185 }
186
187 sub noop {}
188
189 # lei(1) calls this when it can't connect
190 sub lazy_start ($$) {
191         my ($path, $err) = @_;
192         if ($err == ECONNREFUSED) {
193                 unlink($path) or die "unlink($path): $!";
194         } elsif ($err != ENOENT) {
195                 die "connect($path): $!";
196         }
197         my $umask = umask(077) // die("umask(077): $!");
198         my $l = IO::Socket::UNIX->new(Local => $path,
199                                         Listen => 1024,
200                                         Type => SOCK_STREAM) or
201                 $err = $!;
202         umask($umask) or die("umask(restore): $!");
203         $l or return $err;
204         my @st = stat($path) or die "stat($path): $!";
205         my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
206         pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
207         my $oldset = PublicInbox::Sigfd::block_signals();
208         my $pid = fork // die "fork: $!";
209         if ($pid) {
210                 PublicInbox::Sigfd::sig_setmask($oldset);
211                 return; # client will connect to $path
212         }
213         openlog($path, 'pid', 'user');
214         local $SIG{__DIE__} = sub {
215                 syslog('crit', "@_");
216                 exit $! if $!;
217                 exit $? >> 8 if $? >> 8;
218                 exit 255;
219         };
220         local $SIG{__WARN__} = sub { syslog('warning', "@_") };
221         open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!\n";
222         open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n";
223         open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n";
224         setsid();
225         $pid = fork // die "fork: $!";
226         exit if $pid;
227         $0 = "lei-daemon $path";
228         require PublicInbox::Listener;
229         require PublicInbox::EOFpipe;
230         $l->blocking(0);
231         $eof_w->blocking(0);
232         $eof_r->blocking(0);
233         my $listener = PublicInbox::Listener->new($l, \&accept_dispatch, $l);
234         my $exit_code;
235         local $quit = sub {
236                 $exit_code //= shift;
237                 my $tmp = $listener or exit($exit_code);
238                 unlink($path) if defined($path);
239                 syswrite($eof_w, '.');
240                 $l = $listener = $path = undef;
241                 $tmp->close if $tmp; # DS::close
242                 PublicInbox::DS->SetLoopTimeout(1000);
243         };
244         PublicInbox::EOFpipe->new($eof_r, sub {}, undef);
245         my $sig = {
246                 CHLD => \&PublicInbox::DS::enqueue_reap,
247                 QUIT => $quit,
248                 INT => $quit,
249                 TERM => $quit,
250                 HUP => \&noop,
251                 USR1 => \&noop,
252                 USR2 => \&noop,
253         };
254         my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK);
255         local %SIG = (%SIG, %$sig) if !$sigfd;
256         if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
257                 PublicInbox::DS->SetLoopTimeout(5000);
258         } else {
259                 # wake up every second to accept signals if we don't
260                 # have signalfd or IO::KQueue:
261                 PublicInbox::Sigfd::sig_setmask($oldset);
262                 PublicInbox::DS->SetLoopTimeout(1000);
263         }
264         PublicInbox::DS->SetPostLoopCallback(sub {
265                 my ($dmap, undef) = @_;
266                 if (@st = defined($path) ? stat($path) : ()) {
267                         if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
268                                 warn "$path dev/ino changed, quitting\n";
269                                 $path = undef;
270                         }
271                 } elsif (defined($path)) {
272                         warn "stat($path): $!, quitting ...\n";
273                         undef $path; # don't unlink
274                         $quit->();
275                 }
276                 return 1 if defined($path);
277                 my $now = now();
278                 my $n = 0;
279                 for my $s (values %$dmap) {
280                         $s->can('busy') or next;
281                         if ($s->busy($now)) {
282                                 ++$n;
283                         } else {
284                                 $s->close;
285                         }
286                 }
287                 $n; # true: continue, false: stop
288         });
289         PublicInbox::DS->EventLoop;
290         exit($exit_code // 0);
291 }
292
293 # for users w/o IO::FDPass
294 sub oneshot {
295         dispatch({
296                 0 => *STDIN{IO},
297                 1 => *STDOUT{IO},
298                 2 => *STDERR{IO},
299                 env => \%ENV
300         }, @ARGV);
301 }
302
303 1;