git clones may take longer than 30s, much longer... So prepare
to wait almost indefinitely for sockets to timeout and document
the second signal behavior for immediate shutdown.
While we're at it, move parent death handling to a separate
class to avoid Danga::Socket->AddOtherFds, since that does not
allow proper handling the parent pipe being closed and would
actually misterminate a worker prematurely. t/nntpd.t is update
to illustrate the failure with workers enabled.
We will work to keep memory usage low and let clients take their
time without interrupting them.
require Danga::Socket;
require POSIX;
require PublicInbox::Listener;
require Danga::Socket;
require POSIX;
require PublicInbox::Listener;
+require PublicInbox::ParentPipe;
my @CMD;
my $set_user;
my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
my @CMD;
my $set_user;
my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
+
+sub worker_quit {
+ my ($reason) = @_;
# killing again terminates immediately:
exit unless @listeners;
$_->close foreach @listeners; # call Danga::Socket::close
@listeners = ();
# killing again terminates immediately:
exit unless @listeners;
$_->close foreach @listeners; # call Danga::Socket::close
@listeners = ();
+ $reason->close if ref($reason) eq 'PublicInbox::ParentPipe';
- # give slow clients 30s to finish reading/writing whatever
- Danga::Socket->AddTimer(30, sub { exit });
-
+ my $proc_name;
+ my $warn = 0;
# drop idle connections and try to quit gracefully
Danga::Socket->SetPostLoopCallback(sub {
my ($dmap, undef) = @_;
# drop idle connections and try to quit gracefully
Danga::Socket->SetPostLoopCallback(sub {
my ($dmap, undef) = @_;
foreach my $s (values %$dmap) {
if ($s->can('busy') && $s->busy) {
foreach my $s (values %$dmap) {
if ($s->can('busy') && $s->busy) {
} else {
# close as much as possible, early as possible
$s->close;
}
}
} else {
# close as much as possible, early as possible
$s->close;
}
}
+ if ($n) {
+ if (($warn + 5) < time) {
+ warn "$$ quitting, $n client(s) left\n";
+ $warn = time;
+ }
+ unless (defined $proc_name) {
+ $proc_name = (split(/\s+/, $0))[0];
+ $proc_name =~ s!\A.*?([^/]+)\z!$1!;
+ }
+ $0 = "$proc_name quitting, $n client(s) left";
+ }
$n; # true: loop continues, false: loop breaks
});
}
$n; # true: loop continues, false: loop breaks
});
}
}
reopen_logs();
# main loop
}
reopen_logs();
# main loop
while (1) {
while (my $s = shift @caught) {
if ($s eq 'USR1') {
while (1) {
while (my $s = shift @caught) {
if ($s eq 'USR1') {
} elsif ($s eq 'USR2') {
upgrade();
} elsif ($s =~ /\A(?:QUIT|TERM|INT)\z/) {
} elsif ($s eq 'USR2') {
upgrade();
} elsif ($s =~ /\A(?:QUIT|TERM|INT)\z/) {
- # drops pipes and causes children to die
- exit
+ exit if $quit++;
+ kill_workers($s);
} elsif ($s eq 'WINCH') {
$worker_processes = 0;
} elsif ($s eq 'HUP') {
} elsif ($s eq 'WINCH') {
$worker_processes = 0;
} elsif ($s eq 'HUP') {
}
my $n = scalar keys %pids;
}
my $n = scalar keys %pids;
+ if ($quit) {
+ exit if $n == 0;
+ $set_workers = $worker_processes = $n = 0;
+ }
+
if ($n > $worker_processes) {
while (my ($k, $v) = each %pids) {
kill('TERM', $k) if $v >= $worker_processes;
if ($n > $worker_processes) {
while (my ($k, $v) = each %pids) {
kill('TERM', $k) if $v >= $worker_processes;
my $parent_pipe;
if ($worker_processes > 0) {
$refresh->(); # preload by default
my $parent_pipe;
if ($worker_processes > 0) {
$refresh->(); # preload by default
- $parent_pipe = master_loop(); # returns if in child process
- my $fd = fileno($parent_pipe);
- Danga::Socket->AddOtherFds($fd => *worker_quit);
+ my $fh = master_loop(); # returns if in child process
+ $parent_pipe = PublicInbox::ParentPipe->new($fh, *worker_quit);
} else {
reopen_logs();
$set_user->() if $set_user;
} else {
reopen_logs();
$set_user->() if $set_user;
- $SIG{USR2} = sub { worker_quit() if upgrade() };
+ $SIG{USR2} = sub { worker_quit('USR2') if upgrade() };
$refresh->();
}
$uid = $gid = undef;
$refresh->();
}
$uid = $gid = undef;
--- /dev/null
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+# only for PublicInbox::Daemon
+package PublicInbox::ParentPipe;
+use strict;
+use warnings;
+use base qw(Danga::Socket);
+use fields qw(cb);
+
+sub new ($$$) {
+ my ($class, $pipe, $cb) = @_;
+ my $self = fields::new($class);
+ $self->SUPER::new($pipe);
+ $self->{cb} = $cb;
+ $self->watch_read(1);
+ $self;
+}
+
+sub event_read { $_[0]->{cb}->($_[0]) }
+
+1;
'XHDR on invalid header returns empty');
{
'XHDR on invalid header returns empty');
{
- syswrite($s, "HDR List-id 1-\r\n");
+ setsockopt($s, IPPROTO_TCP, TCP_NODELAY, 1);
+ syswrite($s, 'HDR List-id 1-');
+ select(undef, undef, undef, 0.15);
+ ok(kill('TERM', $pid), 'killed nntpd');
+ select(undef, undef, undef, 0.15);
+ syswrite($s, "\r\n");
$buf = '';
do {
sysread($s, $buf, 4096, length($buf));
$buf = '';
do {
sysread($s, $buf, 4096, length($buf));
is(scalar @r, 1, 'only one response line');
}
is(scalar @r, 1, 'only one response line');
}
- ok(kill('TERM', $pid), 'killed nntpd');
- $pid = undef;
- waitpid(-1, 0);
+ is($pid, waitpid($pid, 0), 'nntpd exited successfully');
+ is($?, 0, 'no error in exited process');