]> Sergey Matveev's repositories - public-inbox.git/blob - public-inbox-nntpd
remove bytes:: for length checks
[public-inbox.git] / public-inbox-nntpd
1 #!/usr/bin/perl -w
2 # Copyright (C) 2015 all contributors <meta@public-inbox.org>
3 # License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt)
4 use strict;
5 use warnings;
6 my @CMD = ($0, @ARGV);
7 require Danga::Socket;
8 require IO::Handle;
9 use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
10 require PublicInbox::NewsGroup;
11 my $nntpd = PublicInbox::NNTPD->new;
12 my $refresh = sub { $nntpd->refresh_groups };
13 $SIG{HUP} = $SIG{USR1} = $SIG{USR2} = $SIG{PIPE} =
14         $SIG{TTIN} = $SIG{TTOU} = $SIG{WINCH} = 'IGNORE';
15
16 $refresh->();
17 my (@cfg_listen, $stdout, $stderr);
18 my $worker_processes = 0;
19 my %opts = (
20         'l|listen=s' => \@cfg_listen,
21         '1|stdout=s' => \$stdout,
22         '2|stderr=s' => \$stderr,
23         'W|worker-processes=i' => \$worker_processes,
24 );
25 GetOptions(%opts) or die "bad command-line args\n";
26 my %pids;
27 my %listener_names;
28 my $reexec_pid;
29 my @listeners = inherit();
30
31 # default NNTP listener if no listeners
32 push @cfg_listen, '0.0.0.0:119' unless (@listeners || @cfg_listen);
33
34 foreach my $l (@cfg_listen) {
35         next if $listener_names{$l}; # already inherited
36         require IO::Socket::INET6; # works for IPv4, too
37         my %o = (
38                 LocalAddr => $l,
39                 ReuseAddr => 1,
40                 Proto => 'tcp',
41         );
42         if (my $s = IO::Socket::INET6->new(%o)) {
43                 $listener_names{sockname($s)} = $s;
44                 push @listeners, $s;
45         } else {
46                 warn "error binding $l: $!\n";
47         }
48 }
49 die 'No listeners bound' unless @listeners;
50 open(STDIN, '+<', '/dev/null');
51
52 if ($worker_processes > 0) {
53         # my ($p0, $p1, $r, $w);
54         pipe(my ($p0, $p1)) or die "failed to create parent-pipe: $!\n";
55         my %pwatch = ( fileno($p0) => sub { kill('TERM', $$) } );
56         pipe(my ($r, $w)) or die "failed to create self-pipe: $!\n";
57         IO::Handle::blocking($w, 0);
58         my $set_workers = $worker_processes;
59         my @caught;
60         my $master_pid = $$;
61         foreach my $s (qw(HUP CHLD QUIT INT TERM USR1 USR2 TTIN TTOU WINCH)) {
62                 $SIG{$s} = sub {
63                         return if $$ != $master_pid;
64                         push @caught, $s;
65                         syswrite($w, '.');
66                 };
67         }
68         reopen_logs();
69         # main loop
70         while (1) {
71                 while (my $s = shift @caught) {
72                         if ($s eq 'USR1') {
73                                 reopen_logs();
74                                 kill_workers($s);
75                         } elsif ($s eq 'USR2') {
76                                 upgrade();
77                         } elsif ($s =~ /\A(?:QUIT|TERM|INT)\z/) {
78                                 # drops pipes and causes children to die
79                                 exit
80                         } elsif ($s eq 'WINCH') {
81                                 $worker_processes = 0;
82                         } elsif ($s eq 'HUP') {
83                                 $worker_processes = $set_workers;
84                                 $refresh->();
85                                 kill_workers($s);
86                         } elsif ($s eq 'TTIN') {
87                                 if ($set_workers > $worker_processes) {
88                                         ++$worker_processes;
89                                 } else {
90                                         $worker_processes = ++$set_workers;
91                                 }
92                         } elsif ($s eq 'TTOU') {
93                                 if ($set_workers > 0) {
94                                         $worker_processes = --$set_workers;
95                                 }
96                         } elsif ($s eq 'CHLD') {
97                                 reap_children();
98                         }
99                 }
100
101                 my $n = scalar keys %pids;
102                 if ($n > $worker_processes) {
103                         while (my ($k, $v) = each %pids) {
104                                 kill('TERM', $k) if $v >= $worker_processes;
105                         }
106                         $n = $worker_processes;
107                 }
108                 foreach my $i ($n..($worker_processes - 1)) {
109                         my ($pid, $err) = do_fork();
110                         if (!defined $pid) {
111                                 warn "failed to fork worker[$i]: $err\n";
112                         } elsif ($pid == 0) {
113                                 close($_) for ($w, $r, $p1);
114                                 Danga::Socket->AddOtherFds(%pwatch);
115                                 goto worker;
116                         } else {
117                                 warn "PID=$pid is worker[$i]\n";
118                                 $pids{$pid} = $i;
119                         }
120                 }
121                 sysread($r, my $buf, 8);
122         }
123 } else {
124 worker:
125         # this calls epoll_create:
126         @listeners = map { PublicInbox::Listener->new($_) } @listeners;
127         reopen_logs();
128         $SIG{QUIT} = $SIG{INT} = $SIG{TERM} = *worker_quit;
129         $SIG{USR1} = *reopen_logs;
130         $SIG{HUP} = $refresh;
131         $_->watch_read(1) for @listeners;
132         Danga::Socket->EventLoop;
133 }
134
135 # end of main
136
137 sub worker_quit {
138         # killing again terminates immediately:
139         exit unless @listeners;
140
141         $_->close for @listeners;
142         @listeners = ();
143
144         # drop idle connections and try to quit gracefully
145         Danga::Socket->SetPostLoopCallback(sub {
146                 my ($dmap, undef) = @_;
147                 my $n = 0;
148                 foreach my $s (values %$dmap) {
149                         next unless ref($s) eq 'PublicInbox::NNTP';
150                         if ($s->{write_buf_size} || $s->{rbuf}) {
151                                 ++$n;
152                         } else {
153                                 $s->close;
154                         }
155                 }
156                 $n; # true: loop continues, false: loop breaks
157         });
158 }
159
160 sub reopen_logs {
161         if ($stdout) {
162                 open STDOUT, '>>', $stdout or
163                         warn "failed to redirect stdout to $stdout: $!\n";
164         }
165         if ($stderr) {
166                 open STDERR, '>>', $stderr or
167                         warn "failed to redirect stderr to $stderr: $!\n";
168         }
169 }
170
171 sub sockname {
172         my ($s) = @_;
173         my $n = getsockname($s) or return;
174         my ($port, $addr);
175         if (length($n) >= 28) {
176                 require Socket6;
177                 ($port, $addr) = Socket6::unpack_sockaddr_in6($n);
178         } else {
179                 ($port, $addr) = Socket::sockaddr_in($n);
180         }
181         if (length($addr) == 4) {
182                 $n = Socket::inet_ntoa($addr)
183         } else {
184                 $n = '['.Socket6::inet_ntop(Socket6::AF_INET6(), $addr).']';
185         }
186         $n .= ":$port";
187 }
188
189 sub inherit {
190         return () if ($ENV{LISTEN_PID} || 0) != $$;
191         my $fds = $ENV{LISTEN_FDS} or return ();
192         my $end = $fds + 2; # LISTEN_FDS_START - 1
193         my @rv = ();
194         foreach my $fd (3..$end) {
195                 my $s = IO::Handle->new;
196                 $s->fdopen($fd, 'r');
197                 if (my $k = sockname($s)) {
198                         $listener_names{$k} = $s;
199                         push @rv, $s;
200                 } else {
201                         warn "failed to inherit fd=$fd (LISTEN_FDS=$fds)";
202                 }
203         }
204         @rv
205 }
206
207 sub upgrade {
208         if ($reexec_pid) {
209                 warn "upgrade in-progress: $reexec_pid\n";
210                 return;
211         }
212         my ($pid, $err) = do_fork();
213         unless (defined $pid) {
214                 warn "fork failed: $err\n";
215                 return;
216         }
217         if ($pid == 0) {
218                 use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
219                 $ENV{LISTEN_FDS} = scalar @listeners;
220                 $ENV{LISTEN_PID} = $$;
221                 foreach my $s (@listeners) {
222                         my $fl = fcntl($s, F_GETFD, 0);
223                         fcntl($s, F_SETFD, $fl &= ~FD_CLOEXEC);
224                 }
225                 exec @CMD;
226                 die "Failed to exec: $!\n";
227         }
228         $reexec_pid = $pid;
229 }
230
231 sub kill_workers {
232         my ($s) = @_;
233
234         while (my ($pid, $id) = each %pids) {
235                 kill $s, $pid;
236         }
237 }
238
239 sub do_fork {
240         require POSIX;
241         my $new = POSIX::SigSet->new;
242         $new->fillset;
243         my $old = POSIX::SigSet->new;
244         POSIX::sigprocmask(&POSIX::SIG_BLOCK, $new, $old) or
245                                 die "SIG_BLOCK: $!\n";
246         my $pid = fork;
247         my $err = $!;
248         POSIX::sigprocmask(&POSIX::SIG_SETMASK, $old) or
249                                 die "SIG_SETMASK: $!\n";
250         ($pid, $err);
251 }
252
253 sub reap_children {
254         while (1) {
255                 my $p = waitpid(-1, &POSIX::WNOHANG) or return;
256                 if (defined $reexec_pid && $p == $reexec_pid) {
257                         $reexec_pid = undef;
258                         warn "reexec PID($p) died with: $?\n";
259                 } elsif (defined(my $id = delete $pids{$p})) {
260                         warn "worker[$id] PID($p) died with: $?\n";
261                 } elsif ($p > 0) {
262                         warn "unknown PID($p) reaped: $?\n";
263                 } else {
264                         return;
265                 }
266         }
267 }
268
269 1;
270 package PublicInbox::Listener;
271 use strict;
272 use warnings;
273 use base 'Danga::Socket';
274 use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
275 use PublicInbox::NNTP;
276
277 sub new ($$) {
278         my ($class, $s) = @_;
279         setsockopt($s, SOL_SOCKET, SO_KEEPALIVE, 1);
280         setsockopt($s, IPPROTO_TCP, TCP_NODELAY, 1);
281         listen($s, 1024);
282         IO::Handle::blocking($s, 0);
283         my $self = fields::new($class);
284         $self->SUPER::new($s);
285 }
286
287 sub event_read {
288         my ($self) = @_;
289         # no loop here, we want to fairly distribute clients
290         # between multiple processes sharing the same socket
291         if (accept(my $c, $self->{sock})) {
292                 IO::Handle::blocking($c, 0); # no accept4 :<
293                 PublicInbox::NNTP->new($c, $nntpd);
294         }
295 }
296
297 1;
298 package PublicInbox::NNTPD;
299 use strict;
300 use warnings;
301 use fields qw(groups grouplist err out);
302
303 sub new {
304         my ($class) = @_;
305         my $self = fields::new($class);
306         $self->{groups} = {};
307         $self->{err} = \*STDERR;
308         $self->{out} = \*STDOUT;
309         $self->{grouplist} = [];
310         $self;
311 }
312
313 sub refresh_groups {
314         my ($self) = @_;
315         require PublicInbox::Config;
316         my $pi_config = PublicInbox::Config->new;
317         my $new = {};
318         my @list;
319         foreach my $k (keys %$pi_config) {
320                 $k =~ /\Apublicinbox\.([^\.]+)\.mainrepo\z/ or next;
321                 my $g = $1;
322                 my $git_dir = $pi_config->{$k};
323                 my $addr = $pi_config->{"publicinbox.$g.address"};
324                 my $ngname = $pi_config->{"publicinbox.$g.newsgroup"};
325                 if (defined $ngname) {
326                         next if ($ngname eq ''); # disabled
327                         $g = $ngname;
328                 }
329                 my $ng = PublicInbox::NewsGroup->new($g, $git_dir, $addr);
330                 my $old_ng = $self->{groups}->{$g};
331
332                 # Reuse the old one if possible since it can hold
333                 # references to valid mm and gcf objects
334                 if ($old_ng) {
335                         $old_ng->update($ng);
336                         $ng = $old_ng;
337                 }
338
339                 # Only valid if Msgmap works
340                 if ($ng->mm(1)) {
341                         $new->{$g} = $ng;
342                         push @list, $ng;
343                 }
344         }
345         @list = sort { $a->{name} cmp $b->{name} } @list;
346         $self->{grouplist} = \@list;
347         # this will destroy old groups that got deleted
348         %{$self->{groups}} = %$new;
349 }
350
351 1;