X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=public-inbox-nntpd;h=23d269d4b778c9ffb6480f81ef3154eac56a72b3;hb=ac6f7081a484a053ddb60a2f8b6b6487664827ac;hp=b66de58efba490facb0bc73a7e839ae0a26bfaa7;hpb=ebb1ae89f3b4a52daaf581e727c0dcbe6b7c103e;p=public-inbox.git diff --git a/public-inbox-nntpd b/public-inbox-nntpd old mode 100644 new mode 100755 index b66de58e..23d269d4 --- a/public-inbox-nntpd +++ b/public-inbox-nntpd @@ -1,339 +1,69 @@ #!/usr/bin/perl -w # Copyright (C) 2015 all contributors # License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt) +# +# Standalone NNTP server for public-inbox. use strict; use warnings; -my @CMD = ($0, @ARGV); -require Danga::Socket; -require IO::Handle; -use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/; +require PublicInbox::Daemon; require PublicInbox::NewsGroup; +require PublicInbox::NNTP; +require PublicInbox::Config; my $nntpd = PublicInbox::NNTPD->new; -my $refresh = sub { $nntpd->refresh_groups }; -$SIG{HUP} = $SIG{USR1} = $SIG{USR2} = $SIG{PIPE} = - $SIG{TTIN} = $SIG{TTOU} = $SIG{WINCH} = 'IGNORE'; - -$refresh->(); -my (@cfg_listen, $stdout, $stderr); -my $worker_processes = 0; -my %opts = ( - 'l|listen=s' => \@cfg_listen, - '1|stdout=s' => \$stdout, - '2|stderr=s' => \$stderr, - 'W|worker-processes=i' => \$worker_processes, -); -GetOptions(%opts) or die "bad command-line args\n"; -my %pids; -my %listener_names; -my $reexec_pid; -my @listeners = inherit(); - -# default NNTP listener if no listeners -push @cfg_listen, '0.0.0.0:119' unless (@listeners || @cfg_listen); - -foreach my $l (@cfg_listen) { - next if $listener_names{$l}; # already inherited - require IO::Socket::INET6; # works for IPv4, too - my %o = ( - LocalAddr => $l, - ReuseAddr => 1, - Proto => 'tcp', - ); - if (my $s = IO::Socket::INET6->new(%o)) { - $listener_names{sockname($s)} = $s; - push @listeners, $s; - } else { - warn "error binding $l: $!\n"; - } -} -die 'No listeners bound' unless @listeners; -open(STDIN, '+<', '/dev/null'); - -if ($worker_processes > 0) { - # my ($p0, $p1, $r, $w); - pipe(my ($p0, $p1)) or die "failed to create parent-pipe: $!\n"; - my %pwatch = ( fileno($p0) => sub { kill('TERM', $$) } ); - pipe(my ($r, $w)) or die "failed to create self-pipe: $!\n"; - IO::Handle::blocking($w, 0); - my $set_workers = $worker_processes; - my @caught; - my $master_pid = $$; - foreach my $s (qw(HUP CHLD QUIT INT TERM USR1 USR2 TTIN TTOU WINCH)) { - $SIG{$s} = sub { - return if $$ != $master_pid; - push @caught, $s; - syswrite($w, '.'); - }; - } - reopen_logs(); - # main loop - while (1) { - while (my $s = shift @caught) { - if ($s eq 'USR1') { - reopen_logs(); - kill_workers($s); - } elsif ($s eq 'USR2') { - upgrade(); - } elsif ($s =~ /\A(?:QUIT|TERM|INT)\z/) { - # drops pipes and causes children to die - exit - } elsif ($s eq 'WINCH') { - $worker_processes = 0; - } elsif ($s eq 'HUP') { - $worker_processes = $set_workers; - $refresh->(); - kill_workers($s); - } elsif ($s eq 'TTIN') { - if ($set_workers > $worker_processes) { - ++$worker_processes; - } else { - $worker_processes = ++$set_workers; - } - } elsif ($s eq 'TTOU') { - if ($set_workers > 0) { - $worker_processes = --$set_workers; - } - } elsif ($s eq 'CHLD') { - reap_children(); - } - } - - my $n = scalar keys %pids; - if ($n > $worker_processes) { - while (my ($k, $v) = each %pids) { - kill('TERM', $k) if $v >= $worker_processes; - } - $n = $worker_processes; - } - foreach my $i ($n..($worker_processes - 1)) { - my ($pid, $err) = do_fork(); - if (!defined $pid) { - warn "failed to fork worker[$i]: $err\n"; - } elsif ($pid == 0) { - close($_) for ($w, $r, $p1); - Danga::Socket->AddOtherFds(%pwatch); - goto worker; - } else { - warn "PID=$pid is worker[$i]\n"; - $pids{$pid} = $i; - } - } - sysread($r, my $buf, 8); - } -} else { -worker: - # this calls epoll_create: - @listeners = map { PublicInbox::Listener->new($_) } @listeners; - reopen_logs(); - $SIG{QUIT} = $SIG{INT} = $SIG{TERM} = *worker_quit; - $SIG{USR1} = *reopen_logs; - $SIG{HUP} = $refresh; - $_->watch_read(1) for @listeners; - Danga::Socket->EventLoop; -} - -# end of main - -sub worker_quit { - # killing again terminates immediately: - exit unless @listeners; - - $_->close for @listeners; - @listeners = (); - - # drop idle connections and try to quit gracefully - Danga::Socket->SetPostLoopCallback(sub { - my ($dmap, undef) = @_; - my $n = 0; - foreach my $s (values %$dmap) { - next unless ref($s) eq 'PublicInbox::NNTP'; - if ($s->{write_buf_size} || $s->{rbuf}) { - ++$n; - } else { - $s->close; - } - } - $n; # true: loop continues, false: loop breaks - }); -} - -sub reopen_logs { - if ($stdout) { - open STDOUT, '>>', $stdout or - warn "failed to redirect stdout to $stdout: $!\n"; - } - if ($stderr) { - open STDERR, '>>', $stderr or - warn "failed to redirect stderr to $stderr: $!\n"; - } -} - -sub sockname { - my ($s) = @_; - my $n = getsockname($s) or return; - my ($port, $addr); - if (bytes::length($n) >= 28) { - require Socket6; - ($port, $addr) = Socket6::unpack_sockaddr_in6($n); - } else { - ($port, $addr) = Socket::sockaddr_in($n); - } - if (bytes::length($addr) == 4) { - $n = Socket::inet_ntoa($addr) - } else { - $n = '['.Socket6::inet_ntop(Socket6::AF_INET6(), $addr).']'; - } - $n .= ":$port"; -} - -sub inherit { - return () if ($ENV{LISTEN_PID} || 0) != $$; - my $fds = $ENV{LISTEN_FDS} or return (); - my $end = $fds + 2; # LISTEN_FDS_START - 1 - my @rv = (); - foreach my $fd (3..$end) { - my $s = IO::Handle->new; - $s->fdopen($fd, 'r'); - if (my $k = sockname($s)) { - $listener_names{$k} = $s; - push @rv, $s; - } else { - warn "failed to inherit fd=$fd (LISTEN_FDS=$fds)"; - } - } - @rv -} - -sub upgrade { - if ($reexec_pid) { - warn "upgrade in-progress: $reexec_pid\n"; - return; - } - my ($pid, $err) = do_fork(); - unless (defined $pid) { - warn "fork failed: $err\n"; - return; - } - if ($pid == 0) { - use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); - $ENV{LISTEN_FDS} = scalar @listeners; - $ENV{LISTEN_PID} = $$; - foreach my $s (@listeners) { - my $fl = fcntl($s, F_GETFD, 0); - fcntl($s, F_SETFD, $fl &= ~FD_CLOEXEC); - } - exec @CMD; - die "Failed to exec: $!\n"; - } - $reexec_pid = $pid; -} - -sub kill_workers { - my ($s) = @_; - - while (my ($pid, $id) = each %pids) { - kill $s, $pid; - } -} - -sub do_fork { - require POSIX; - my $new = POSIX::SigSet->new; - $new->fillset; - my $old = POSIX::SigSet->new; - POSIX::sigprocmask(&POSIX::SIG_BLOCK, $new, $old) or - die "SIG_BLOCK: $!\n"; - my $pid = fork; - my $err = $!; - POSIX::sigprocmask(&POSIX::SIG_SETMASK, $old) or - die "SIG_SETMASK: $!\n"; - ($pid, $err); -} - -sub reap_children { - while (1) { - my $p = waitpid(-1, &POSIX::WNOHANG) or return; - if (defined $reexec_pid && $p == $reexec_pid) { - $reexec_pid = undef; - warn "reexec PID($p) died with: $?\n"; - } elsif (defined(my $id = delete $pids{$p})) { - warn "worker[$id] PID($p) died with: $?\n"; - } elsif ($p > 0) { - warn "unknown PID($p) reaped: $?\n"; - } else { - return; - } - } -} - -1; -package PublicInbox::Listener; -use strict; -use warnings; -use base 'Danga::Socket'; -use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY); -use PublicInbox::NNTP; - -sub new ($$) { - my ($class, $s) = @_; - setsockopt($s, SOL_SOCKET, SO_KEEPALIVE, 1); - setsockopt($s, IPPROTO_TCP, TCP_NODELAY, 1); - listen($s, 1024); - IO::Handle::blocking($s, 0); - my $self = fields::new($class); - $self->SUPER::new($s); -} - -sub event_read { - my ($self) = @_; - # no loop here, we want to fairly distribute clients - # between multiple processes sharing the same socket - if (accept(my $c, $self->{sock})) { - IO::Handle::blocking($c, 0); # no accept4 :< - PublicInbox::NNTP->new($c, $nntpd); - } -} +daemon_run('0.0.0.0:119', + sub { $nntpd->refresh_groups }, # refresh + sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }); # post_accept 1; package PublicInbox::NNTPD; use strict; use warnings; -use fields qw(groups err out); sub new { my ($class) = @_; - my $self = fields::new($class); - $self->{groups} = {}; - $self->{err} = \*STDERR; - $self->{out} = \*STDOUT; - $self; + bless { + groups => {}, + err => \*STDERR, + out => \*STDOUT, + grouplist => [], + }, $class; } -sub refresh_groups { +sub refresh_groups () { my ($self) = @_; - require PublicInbox::Config; my $pi_config = PublicInbox::Config->new; my $new = {}; + my @list; foreach my $k (keys %$pi_config) { $k =~ /\Apublicinbox\.([^\.]+)\.mainrepo\z/ or next; my $g = $1; my $git_dir = $pi_config->{$k}; - my $address = $pi_config->{"publicinbox.$g.address"}; - my $ng = PublicInbox::NewsGroup->new($g, $git_dir, $address); + my $addr = $pi_config->{"publicinbox.$g.address"}; + my $ngname = $pi_config->{"publicinbox.$g.newsgroup"}; + if (defined $ngname) { + next if ($ngname eq ''); # disabled + $g = $ngname; + } + my $ng = PublicInbox::NewsGroup->new($g, $git_dir, $addr); my $old_ng = $self->{groups}->{$g}; - # Reuse the old one if possible since it can hold references - # to valid mm and gcf objects + # Reuse the old one if possible since it can hold + # references to valid mm and gcf objects if ($old_ng) { $old_ng->update($ng); $ng = $old_ng; } - # Only valid if Msgmap works - $new->{$g} = $ng if $ng->mm(1); + # Only valid if msgmap and search works + if ($ng->usable) { + $new->{$g} = $ng; + push @list, $ng; + } } + @list = sort { $a->{name} cmp $b->{name} } @list; + $self->{grouplist} = \@list; # this will destroy old groups that got deleted %{$self->{groups}} = %$new; -}; +} 1;