]> Sergey Matveev's repositories - public-inbox.git/blob - lib/PublicInbox/LeiDaemon.pm
fd4d00d421dd9dce375669136411ccadfc38a352
[public-inbox.git] / lib / PublicInbox / LeiDaemon.pm
1 # Copyright (C) 2020 all contributors <meta@public-inbox.org>
2 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
3
4 # Backend for `lei' (local email interface).  Unlike the C10K-oriented
5 # PublicInbox::Daemon, this is designed exclusively to handle trusted
6 # local clients with read/write access to the FS and use as many
7 # system resources as the local user has access to.
8 package PublicInbox::LeiDaemon;
9 use strict;
10 use v5.10.1;
11 use parent qw(PublicInbox::DS);
12 use Getopt::Long ();
13 use Errno qw(EAGAIN ECONNREFUSED ENOENT);
14 use POSIX qw(setsid);
15 use IO::Socket::UNIX;
16 use IO::Handle ();
17 use Sys::Syslog qw(syslog openlog);
18 use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
19 use PublicInbox::Sigfd;
20 use PublicInbox::DS qw(now);
21 use PublicInbox::Spawn qw(spawn);
22 our $quit = sub { exit(shift // 0) };
23 my $glp = Getopt::Long::Parser->new;
24 $glp->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
25
26 # TBD: this is a documentation mechanism to show a subcommand
27 # (may) pass options through to another command:
28 sub pass_through { () }
29
30 # TODO: generate shell completion + help using %CMD and %OPTDESC
31 # command => [ positional_args, 1-line description, Getopt::Long option spec ]
32 our %CMD = ( # sorted in order of importance/use:
33 'query' => [ 'SEARCH-TERMS...', 'search for messages matching terms', qw(
34         save-as=s output|o=s format|f=s dedupe|d=s thread|t augment|a
35         limit|n=i sort|s=s reverse|r offset=i remote local! extinbox!
36         since|after=s until|before=s) ],
37
38 'show' => [ '{MID|OID}', 'show a given object (Message-ID or object ID)',
39         qw(type=s solve! format|f=s dedupe|d=s thread|t remote local!),
40         pass_through('git show') ],
41
42 'add-extinbox' => [ 'URL-OR-PATHNAME',
43         'add/set priority of a publicinbox|extindex for extra matches',
44         qw(prio=i) ],
45 'ls-extinbox' => [ '[FILTER]', 'list publicinbox|extindex locations',
46         qw(format|f=s z local remote) ],
47 'forget-extinbox' => [ '{URL-OR-PATHNAME|--prune}',
48         'exclude further results from a publicinbox|extindex',
49         qw(prune) ],
50
51 'ls-query' => [ '[FILTER]', 'list saved search queries',
52                 qw(name-only format|f=s z) ],
53 'rm-query' => [ 'QUERY_NAME', 'remove a saved search' ],
54 'mv-query' => [ qw(OLD_NAME NEW_NAME), 'rename a saved search' ],
55
56 'plonk' => [ '{--thread|--from=IDENT}',
57         'exclude mail matching From: or thread from non-Message-ID searches',
58         qw(thread|t from|f=s mid=s oid=s) ],
59 'mark' => [ 'MESSAGE-FLAGS', 'set/unset flags on message(s) from stdin',
60         qw(stdin| oid=s exact by-mid|mid:s) ],
61 'forget' => [ '--stdin', 'exclude message(s) on stdin from query results',
62         qw(stdin| oid=s  exact by-mid|mid:s) ],
63
64 'purge-mailsource' => [ '{URL-OR-PATHNAME|--all}',
65         'remove imported messages from IMAP, Maildirs, and MH',
66         qw(exact! all jobs:i indexed) ],
67
68 # code repos are used for `show' to solve blobs from patch mails
69 'add-coderepo' => [ 'PATHNAME', 'add or set priority of a git code repo',
70         qw(prio=i) ],
71 'ls-coderepo' => [ '[FILTER]', 'list known code repos', qw(format|f=s z) ],
72 'forget-coderepo' => [ 'PATHNAME',
73         'stop using repo to solve blobs from patches',
74         qw(prune) ],
75
76 'add-watch' => [ '[URL_OR_PATHNAME]',
77                 'watch for new messages and flag changes',
78         qw(import! flags! interval=s recursive|r exclude=s include=s) ],
79 'ls-watch' => [ '[FILTER]', 'list active watches with numbers and status',
80                 qw(format|f=s z) ],
81 'pause-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote) ],
82 'resume-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote) ],
83 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
84         qw(prune) ],
85
86 'import' => [ '{URL_OR_PATHNAME|--stdin}',
87         'one-shot import/update from URL or filesystem',
88         qw(stdin| limit|n=i offset=i recursive|r exclude=s include=s !flags),
89         ],
90
91 'config' => [ '[ANYTHING...]',
92                 'git-config(1) wrapper for ~/.config/lei/config',
93                 pass_through('git config') ],
94 'daemon-stop' => [ undef, 'stop the lei-daemon' ],
95 'daemon-pid' => [ undef, 'show the PID of the lei-daemon' ],
96 'help' => [ '[SUBCOMMAND]', 'show help' ],
97
98 # XXX do we need this?
99 # 'git' => [ '[ANYTHING...]', 'git(1) wrapper', pass_through('git') ],
100
101 'reorder-local-store-and-break-history' => [ '[REFNAME]',
102         'rewrite git history in an attempt to improve compression',
103         'gc!' ]
104 ); # @CMD
105
106 # switch descriptions, try to keep consistent across commands
107 # $spec: Getopt::Long option specification
108 # $spec => [@ALLOWED_VALUES (default is first), $description],
109 # $spec => $description
110 # "$SUB_COMMAND TAB $spec" => as above
111 my $stdin_formats = [ qw(auto raw mboxrd mboxcl2 mboxcl mboxo),
112                 'specify message input format' ];
113 my $ls_format = [ qw(plain json null), 'listing output format' ];
114 my $show_format = [ qw(plain raw html mboxrd mboxcl2 mboxcl),
115                 'message/object output format' ];
116
117 my %OPTDESC = (
118 'solve!' => 'do not attempt to reconstruct blobs from emails',
119 'save-as=s' => 'save a search terms by given name',
120
121 'type=s' => [qw(any mid git), 'disambiguate type' ],
122
123 'dedupe|d=s' => [qw(content oid mid), 'deduplication strategy'],
124 'thread|t' => 'every message in the same thread as the actual match(es)',
125 'augment|a' => 'augment --output destination instead of clobbering',
126
127 'output|o=s' => "destination (e.g. `/path/to/Maildir', or `-' for stdout)",
128
129 'mark   format|f=s' => $stdin_formats,
130 'forget format|f=s' => $stdin_formats,
131 'query  format|f=s' => [qw(maildir mboxrd mboxcl2 mboxcl html oid),
132                 q[specify output format (default: determined by --output)]],
133 'ls-query       format|f=s' => $ls_format,
134 'ls-extinbox format|f=s' => $ls_format,
135
136 'limit|n=i' => 'integer limit on number of matches (default: 10000)',
137 'offset=i' => 'search result offset (default: 0)',
138
139 'sort|s=s@' => [qw(internaldate date relevance docid),
140                 "order of results `--output'-dependent)"],
141
142 'prio=i' => 'priority of query source',
143
144 'local' => 'limit operations to the local filesystem',
145 'local!' => 'exclude results from the local filesystem',
146 'remote' => 'limit operations to those requiring network access',
147 'remote!' => 'prevent operations requiring network access',
148
149 'mid=s' => 'specify the Message-ID of a message',
150 'oid=s' => 'specify the git object ID of a message',
151
152 'recursive|r' => 'scan directories/mailboxes/newsgroups recursively',
153 'exclude=s' => 'exclude mailboxes/newsgroups based on pattern',
154 'include=s' => 'include mailboxes/newsgroups based on pattern',
155
156 'exact' => 'operate on exact header matches only',
157 'exact!' => 'rely on content match instead of exact header matches',
158
159 'by-mid|mid:s' => 'match only by Message-ID, ignoring contents',
160 'jobs:i' => 'set parallelism level',
161 ); # %OPTDESC
162
163 sub x_it ($$) { # pronounced "exit"
164         my ($client, $code) = @_;
165         if (my $sig = ($code & 127)) {
166                 kill($sig, $client->{pid} // $$);
167         } else {
168                 $code >>= 8;
169                 if (my $sock = $client->{sock}) {
170                         say $sock "exit=$code";
171                 } else { # for oneshot
172                         $quit->($code);
173                 }
174         }
175 }
176
177 sub emit ($$$) {
178         my ($client, $channel, $buf) = @_;
179         print { $client->{$channel} } $buf or warn "print FD[$channel]: $!";
180 }
181
182 sub fail ($$;$) {
183         my ($client, $buf, $exit_code) = @_;
184         $buf .= "\n" unless $buf =~ /\n\z/s;
185         emit($client, 2, $buf);
186         x_it($client, ($exit_code // 1) << 8);
187         undef;
188 }
189
190 sub _help ($;$) {
191         my ($client, $channel) = @_;
192         emit($client, $channel //= 1, <<EOF);
193 usage: lei COMMAND [OPTIONS]
194
195 ...
196 EOF
197         x_it($client, $channel == 2 ? 1 << 8 : 0); # stderr => failure
198 }
199
200 sub assert_args ($$$;$@) {
201         my ($client, $argv, $proto, $opt, @spec) = @_;
202         $opt //= {};
203         push @spec, qw(help|h);
204         $glp->getoptionsfromarray($argv, $opt, @spec) or
205                 return fail($client, 'bad arguments or options');
206         if ($opt->{help}) {
207                 _help($client);
208                 undef;
209         } else {
210                 my ($nreq, $rest) = split(/;/, $proto);
211                 $nreq = (($nreq // '') =~ tr/$/$/);
212                 my $argc = scalar(@$argv);
213                 my $tot = ($rest // '') eq '@' ? $argc : ($proto =~ tr/$/$/);
214                 return 1 if $argc <= $tot && $argc >= $nreq;
215                 _help($client, 2);
216                 undef
217         }
218 }
219
220 sub dispatch {
221         my ($client, $cmd, @argv) = @_;
222         local $SIG{__WARN__} = sub { emit($client, 2, "@_") };
223         local $SIG{__DIE__} = 'DEFAULT';
224         if (defined $cmd) {
225                 my $func = "lei_$cmd";
226                 $func =~ tr/-/_/;
227                 if (my $cb = __PACKAGE__->can($func)) {
228                         $client->{cmd} = $cmd;
229                         $cb->($client, \@argv);
230                 } elsif (grep(/\A-/, $cmd, @argv)) {
231                         assert_args($client, [ $cmd, @argv ], '');
232                 } else {
233                         fail($client, "`$cmd' is not an lei command");
234                 }
235         } else {
236                 _help($client, 2);
237         }
238 }
239
240 sub lei_daemon_pid {
241         my ($client, $argv) = @_;
242         assert_args($client, $argv, '') and emit($client, 1, "$$\n");
243 }
244
245 sub lei_DBG_pwd {
246         my ($client, $argv) = @_;
247         assert_args($client, $argv, '') and
248                 emit($client, 1, "$client->{env}->{PWD}\n");
249 }
250
251 sub lei_DBG_cwd {
252         my ($client, $argv) = @_;
253         require Cwd;
254         assert_args($client, $argv, '') and emit($client, 1, Cwd::cwd()."\n");
255 }
256
257 sub lei_DBG_false { x_it($_[0], 1 << 8) }
258
259 sub lei_daemon_stop {
260         my ($client, $argv) = @_;
261         assert_args($client, $argv, '') and $quit->(0);
262 }
263
264 sub lei_help { _help($_[0]) }
265
266 sub reap_exec { # dwaitpid callback
267         my ($client, $pid) = @_;
268         x_it($client, $?);
269 }
270
271 sub lei_git { # support passing through random git commands
272         my ($client, $argv) = @_;
273         my %opt = map { $_ => $client->{$_} } (0..2);
274         my $pid = spawn(['git', @$argv], $client->{env}, \%opt);
275         PublicInbox::DS::dwaitpid($pid, \&reap_exec, $client);
276 }
277
278 sub accept_dispatch { # Listener {post_accept} callback
279         my ($sock) = @_; # ignore other
280         $sock->blocking(1);
281         $sock->autoflush(1);
282         my $client = { sock => $sock };
283         vec(my $rin = '', fileno($sock), 1) = 1;
284         # `say $sock' triggers "die" in lei(1)
285         for my $i (0..2) {
286                 if (select(my $rout = $rin, undef, undef, 1)) {
287                         my $fd = IO::FDPass::recv(fileno($sock));
288                         if ($fd >= 0) {
289                                 my $rdr = ($fd == 0 ? '<&=' : '>&=');
290                                 if (open(my $fh, $rdr, $fd)) {
291                                         $client->{$i} = $fh;
292                                 } else {
293                                         say $sock "open($rdr$fd) (FD=$i): $!";
294                                         return;
295                                 }
296                         } else {
297                                 say $sock "recv FD=$i: $!";
298                                 return;
299                         }
300                 } else {
301                         say $sock "timed out waiting to recv FD=$i";
302                         return;
303                 }
304         }
305         # $ARGV_STR = join("]\0[", @ARGV);
306         # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV);
307         # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0";
308         my ($client_pid, $argv, $env) = do {
309                 local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2
310                 chomp(my $line = <$sock>);
311                 split(/\0\0>/, $line, 3);
312         };
313         my %env = map { split(/=/, $_, 2) } split(/\0/, $env);
314         if (chdir($env{PWD})) {
315                 $client->{env} = \%env;
316                 $client->{pid} = $client_pid;
317                 eval { dispatch($client, split(/\]\0\[/, $argv)) };
318                 say $sock $@ if $@;
319         } else {
320                 say $sock "chdir($env{PWD}): $!"; # implicit close
321         }
322 }
323
324 sub noop {}
325
326 # lei(1) calls this when it can't connect
327 sub lazy_start {
328         my ($path, $err) = @_;
329         if ($err == ECONNREFUSED) {
330                 unlink($path) or die "unlink($path): $!";
331         } elsif ($err != ENOENT) {
332                 die "connect($path): $!";
333         }
334         require IO::FDPass;
335         my $umask = umask(077) // die("umask(077): $!");
336         my $l = IO::Socket::UNIX->new(Local => $path,
337                                         Listen => 1024,
338                                         Type => SOCK_STREAM) or
339                 $err = $!;
340         umask($umask) or die("umask(restore): $!");
341         $l or return die "bind($path): $err";
342         my @st = stat($path) or die "stat($path): $!";
343         my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
344         pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
345         my $oldset = PublicInbox::Sigfd::block_signals();
346         my $pid = fork // die "fork: $!";
347         return if $pid;
348         openlog($path, 'pid', 'user');
349         local $SIG{__DIE__} = sub {
350                 syslog('crit', "@_");
351                 exit $! if $!;
352                 exit $? >> 8 if $? >> 8;
353                 exit 255;
354         };
355         local $SIG{__WARN__} = sub { syslog('warning', "@_") };
356         open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!\n";
357         open STDOUT, '>&STDIN' or die "redirect stdout failed: $!\n";
358         open STDERR, '>&STDIN' or die "redirect stderr failed: $!\n";
359         setsid();
360         $pid = fork // die "fork: $!";
361         return if $pid;
362         $0 = "lei-daemon $path";
363         require PublicInbox::Listener;
364         require PublicInbox::EOFpipe;
365         $l->blocking(0);
366         $eof_w->blocking(0);
367         $eof_r->blocking(0);
368         my $listener = PublicInbox::Listener->new($l, \&accept_dispatch, $l);
369         my $exit_code;
370         local $quit = sub {
371                 $exit_code //= shift;
372                 my $tmp = $listener or exit($exit_code);
373                 unlink($path) if defined($path);
374                 syswrite($eof_w, '.');
375                 $l = $listener = $path = undef;
376                 $tmp->close if $tmp; # DS::close
377                 PublicInbox::DS->SetLoopTimeout(1000);
378         };
379         PublicInbox::EOFpipe->new($eof_r, sub {}, undef);
380         my $sig = {
381                 CHLD => \&PublicInbox::DS::enqueue_reap,
382                 QUIT => $quit,
383                 INT => $quit,
384                 TERM => $quit,
385                 HUP => \&noop,
386                 USR1 => \&noop,
387                 USR2 => \&noop,
388         };
389         my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK);
390         local %SIG = (%SIG, %$sig) if !$sigfd;
391         if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
392                 PublicInbox::DS->SetLoopTimeout(5000);
393         } else {
394                 # wake up every second to accept signals if we don't
395                 # have signalfd or IO::KQueue:
396                 PublicInbox::Sigfd::sig_setmask($oldset);
397                 PublicInbox::DS->SetLoopTimeout(1000);
398         }
399         PublicInbox::DS->SetPostLoopCallback(sub {
400                 my ($dmap, undef) = @_;
401                 if (@st = defined($path) ? stat($path) : ()) {
402                         if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
403                                 warn "$path dev/ino changed, quitting\n";
404                                 $path = undef;
405                         }
406                 } elsif (defined($path)) {
407                         warn "stat($path): $!, quitting ...\n";
408                         undef $path; # don't unlink
409                         $quit->();
410                 }
411                 return 1 if defined($path);
412                 my $now = now();
413                 my $n = 0;
414                 for my $s (values %$dmap) {
415                         $s->can('busy') or next;
416                         if ($s->busy($now)) {
417                                 ++$n;
418                         } else {
419                                 $s->close;
420                         }
421                 }
422                 $n; # true: continue, false: stop
423         });
424         PublicInbox::DS->EventLoop;
425         exit($exit_code // 0);
426 }
427
428 # for users w/o IO::FDPass
429 sub oneshot {
430         dispatch({
431                 0 => *STDIN{IO},
432                 1 => *STDOUT{IO},
433                 2 => *STDERR{IO},
434                 env => \%ENV
435         }, @ARGV);
436 }
437
438 1;