]> Sergey Matveev's repositories - public-inbox.git/blob - public-inbox-nntpd
588efdd3cc32ea87e5bb6015a688f55459a0cf1b
[public-inbox.git] / public-inbox-nntpd
1 #!/usr/bin/perl -w
2 # Copyright (C) 2015 all contributors <meta@public-inbox.org>
3 # License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt)
4 use strict;
5 use warnings;
6 my @CMD = ($0, @ARGV);
7 require Danga::Socket;
8 require IO::Handle;
9 use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
10 require PublicInbox::NewsGroup;
11 my $nntpd = PublicInbox::NNTPD->new;
12 my $refresh = sub { $nntpd->refresh_groups };
13 $SIG{HUP} = $SIG{USR1} = $SIG{USR2} = $SIG{PIPE} =
14         $SIG{TTIN} = $SIG{TTOU} = $SIG{WINCH} = 'IGNORE';
15
16 $refresh->();
17 my (@cfg_listen, $stdout, $stderr);
18 my $worker_processes = 0;
19 my %opts = (
20         'l|listen=s' => \@cfg_listen,
21         '1|stdout=s' => \$stdout,
22         '2|stderr=s' => \$stderr,
23         'W|worker-processes=i' => \$worker_processes,
24 );
25 GetOptions(%opts) or die "bad command-line args\n";
26 my %pids;
27 my %listener_names;
28 my $reexec_pid;
29 my @listeners = inherit();
30
31 # default NNTP listener if no listeners
32 push @cfg_listen, '0.0.0.0:119' unless (@listeners || @cfg_listen);
33
34 foreach my $l (@cfg_listen) {
35         next if $listener_names{$l}; # already inherited
36         require IO::Socket::INET6; # works for IPv4, too
37         my %o = (
38                 LocalAddr => $l,
39                 ReuseAddr => 1,
40                 Proto => 'tcp',
41         );
42         if (my $s = IO::Socket::INET6->new(%o)) {
43                 $listener_names{sockname($s)} = $s;
44                 push @listeners, $s;
45         } else {
46                 warn "error binding $l: $!\n";
47         }
48 }
49 die 'No listeners bound' unless @listeners;
50 open(STDIN, '+<', '/dev/null');
51
52 if ($worker_processes > 0) {
53         # my ($p0, $p1, $r, $w);
54         pipe(my ($p0, $p1)) or die "failed to create parent-pipe: $!\n";
55         my %pwatch = ( fileno($p0) => sub { kill('TERM', $$) } );
56         pipe(my ($r, $w)) or die "failed to create self-pipe: $!\n";
57         IO::Handle::blocking($w, 0);
58         my $set_workers = $worker_processes;
59         my @caught;
60         my $master_pid = $$;
61         foreach my $s (qw(HUP CHLD QUIT INT TERM USR1 USR2 TTIN TTOU WINCH)) {
62                 $SIG{$s} = sub {
63                         return if $$ != $master_pid;
64                         push @caught, $s;
65                         syswrite($w, '.');
66                 };
67         }
68         reopen_logs();
69         # main loop
70         while (1) {
71                 while (my $s = shift @caught) {
72                         if ($s eq 'USR1') {
73                                 reopen_logs();
74                                 kill_workers($s);
75                         } elsif ($s eq 'USR2') {
76                                 upgrade();
77                         } elsif ($s =~ /\A(?:QUIT|TERM|INT)\z/) {
78                                 # drops pipes and causes children to die
79                                 exit
80                         } elsif ($s eq 'WINCH') {
81                                 $worker_processes = 0;
82                         } elsif ($s eq 'HUP') {
83                                 $worker_processes = $set_workers;
84                                 $refresh->();
85                                 kill_workers($s);
86                         } elsif ($s eq 'TTIN') {
87                                 if ($set_workers > $worker_processes) {
88                                         ++$worker_processes;
89                                 } else {
90                                         $worker_processes = ++$set_workers;
91                                 }
92                         } elsif ($s eq 'TTOU') {
93                                 if ($set_workers > 0) {
94                                         $worker_processes = --$set_workers;
95                                 }
96                         } elsif ($s eq 'CHLD') {
97                                 reap_children();
98                         }
99                 }
100
101                 my $n = scalar keys %pids;
102                 if ($n > $worker_processes) {
103                         while (my ($k, $v) = each %pids) {
104                                 kill('TERM', $k) if $v >= $worker_processes;
105                         }
106                         $n = $worker_processes;
107                 }
108                 foreach my $i ($n..($worker_processes - 1)) {
109                         my ($pid, $err) = do_fork();
110                         if (!defined $pid) {
111                                 warn "failed to fork worker[$i]: $err\n";
112                         } elsif ($pid == 0) {
113                                 close($_) for ($w, $r, $p1);
114                                 Danga::Socket->AddOtherFds(%pwatch);
115                                 goto worker;
116                         } else {
117                                 warn "PID=$pid is worker[$i]\n";
118                                 $pids{$pid} = $i;
119                         }
120                 }
121                 sysread($r, my $buf, 8);
122         }
123 } else {
124 worker:
125         # this calls epoll_create:
126         @listeners = map { PublicInbox::Listener->new($_) } @listeners;
127         reopen_logs();
128         $SIG{QUIT} = $SIG{INT} = $SIG{TERM} = *worker_quit;
129         $SIG{USR1} = *reopen_logs;
130         $SIG{HUP} = $refresh;
131         $_->watch_read(1) for @listeners;
132         Danga::Socket->EventLoop;
133 }
134
135 # end of main
136
137 sub worker_quit {
138         # killing again terminates immediately:
139         exit unless @listeners;
140
141         $_->close for @listeners;
142         @listeners = ();
143
144         # drop idle connections and try to quit gracefully
145         Danga::Socket->SetPostLoopCallback(sub {
146                 my ($dmap, undef) = @_;
147                 my $n = 0;
148                 foreach my $s (values %$dmap) {
149                         if ($s->{write_buf_size} || @{$s->{read_push_back}}) {
150                                 ++$n;
151                         } else {
152                                 $s->close;
153                         }
154                 }
155                 $n; # true: loop continues, false: loop breaks
156         });
157 }
158
159 sub reopen_logs {
160         if ($stdout) {
161                 open STDOUT, '>>', $stdout or
162                         warn "failed to redirect stdout to $stdout: $!\n";
163         }
164         if ($stderr) {
165                 open STDERR, '>>', $stderr or
166                         warn "failed to redirect stderr to $stderr: $!\n";
167         }
168 }
169
170 sub sockname {
171         my ($s) = @_;
172         my $n = getsockname($s) or return;
173         my ($port, $addr);
174         if (bytes::length($n) >= 28) {
175                 require Socket6;
176                 ($port, $addr) = Socket6::unpack_sockaddr_in6($n);
177         } else {
178                 ($port, $addr) = Socket::sockaddr_in($n);
179         }
180         if (bytes::length($addr) == 4) {
181                 $n = Socket::inet_ntoa($addr)
182         } else {
183                 $n = '['.Socket6::inet_ntop(Socket6::AF_INET6(), $addr).']';
184         }
185         $n .= ":$port";
186 }
187
188 sub inherit {
189         return () if ($ENV{LISTEN_PID} || 0) != $$;
190         my $fds = $ENV{LISTEN_FDS} or return ();
191         my $end = $fds + 2; # LISTEN_FDS_START - 1
192         my @rv = ();
193         foreach my $fd (3..$end) {
194                 my $s = IO::Handle->new;
195                 $s->fdopen($fd, 'r');
196                 if (my $k = sockname($s)) {
197                         $listener_names{$k} = $s;
198                         push @rv, $s;
199                 } else {
200                         warn "failed to inherit fd=$fd (LISTEN_FDS=$fds)";
201                 }
202         }
203         @rv
204 }
205
206 sub upgrade {
207         if ($reexec_pid) {
208                 warn "upgrade in-progress: $reexec_pid\n";
209                 return;
210         }
211         my ($pid, $err) = do_fork();
212         unless (defined $pid) {
213                 warn "fork failed: $err\n";
214                 return;
215         }
216         if ($pid == 0) {
217                 use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
218                 $ENV{LISTEN_FDS} = scalar @listeners;
219                 $ENV{LISTEN_PID} = $$;
220                 foreach my $s (@listeners) {
221                         my $fl = fcntl($s, F_GETFD, 0);
222                         fcntl($s, F_SETFD, $fl &= ~FD_CLOEXEC);
223                 }
224                 exec @CMD;
225                 die "Failed to exec: $!\n";
226         }
227         $reexec_pid = $pid;
228 }
229
230 sub kill_workers {
231         my ($s) = @_;
232
233         while (my ($pid, $id) = each %pids) {
234                 kill $s, $pid;
235         }
236 }
237
238 sub do_fork {
239         require POSIX;
240         my $new = POSIX::SigSet->new;
241         $new->fillset;
242         my $old = POSIX::SigSet->new;
243         POSIX::sigprocmask(&POSIX::SIG_BLOCK, $new, $old) or
244                                 die "SIG_BLOCK: $!\n";
245         my $pid = fork;
246         my $err = $!;
247         POSIX::sigprocmask(&POSIX::SIG_SETMASK, $old) or
248                                 die "SIG_SETMASK: $!\n";
249         ($pid, $err);
250 }
251
252 sub reap_children {
253         while (1) {
254                 my $p = waitpid(-1, &POSIX::WNOHANG) or return;
255                 if (defined $reexec_pid && $p == $reexec_pid) {
256                         $reexec_pid = undef;
257                         warn "reexec PID($p) died with: $?\n";
258                 } elsif (defined(my $id = delete $pids{$p})) {
259                         warn "worker[$id] PID($p) died with: $?\n";
260                 } elsif ($p > 0) {
261                         warn "unknown PID($p) reaped: $?\n";
262                 } else {
263                         return;
264                 }
265         }
266 }
267
268 1;
269 package PublicInbox::Listener;
270 use strict;
271 use warnings;
272 use base 'Danga::Socket';
273 use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
274 use PublicInbox::NNTP;
275
276 sub new ($$) {
277         my ($class, $s) = @_;
278         setsockopt($s, SOL_SOCKET, SO_KEEPALIVE, 1);
279         setsockopt($s, IPPROTO_TCP, TCP_NODELAY, 1);
280         listen($s, 1024);
281         IO::Handle::blocking($s, 0);
282         my $self = fields::new($class);
283         $self->SUPER::new($s);
284 }
285
286 sub event_read {
287         my ($self) = @_;
288         # no loop here, we want to fairly distribute clients
289         # between multiple processes sharing the same socket
290         if (accept(my $c, $self->{sock})) {
291                 IO::Handle::blocking($c, 0); # no accept4 :<
292                 PublicInbox::NNTP->new($c, $nntpd);
293         }
294 }
295
296 1;
297 package PublicInbox::NNTPD;
298 use strict;
299 use warnings;
300 use fields qw(groups err out);
301
302 sub new {
303         my ($class) = @_;
304         my $self = fields::new($class);
305         $self->{groups} = {};
306         $self->{err} = \*STDERR;
307         $self->{out} = \*STDOUT;
308         $self;
309 }
310
311 sub refresh_groups {
312         my ($self) = @_;
313         require PublicInbox::Config;
314         my $pi_config = PublicInbox::Config->new;
315         my $new = {};
316         foreach my $k (keys %$pi_config) {
317                 $k =~ /\Apublicinbox\.([^\.]+)\.mainrepo\z/ or next;
318                 my $g = $1;
319                 my $git_dir = $pi_config->{$k};
320                 my $address = $pi_config->{"publicinbox.$g.address"};
321                 my $ng = PublicInbox::NewsGroup->new($g, $git_dir, $address);
322                 my $old_ng = $self->{groups}->{$g};
323
324                 # Reuse the old one if possible since it can hold references
325                 # to valid mm and gcf objects
326                 if ($old_ng) {
327                         $old_ng->update($ng);
328                         $ng = $old_ng;
329                 }
330
331                 # Only valid if Msgmap works
332                 $new->{$g} = $ng if $ng->mm(1);
333         }
334         # this will destroy old groups that got deleted
335         %{$self->{groups}} = %$new;
336 };
337
338 1;