require Danga::Socket;
require POSIX;
require PublicInbox::Listener;
+require PublicInbox::ParentPipe;
my @CMD;
my $set_user;
my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
}
}
-sub worker_quit () {
+
+sub worker_quit {
+ my ($reason) = @_;
# killing again terminates immediately:
exit unless @listeners;
$_->close foreach @listeners; # call Danga::Socket::close
@listeners = ();
+ $reason->close if ref($reason) eq 'PublicInbox::ParentPipe';
- # give slow clients 30s to finish reading/writing whatever
- Danga::Socket->AddTimer(30, sub { exit });
-
+ my $proc_name;
+ my $warn = 0;
# drop idle connections and try to quit gracefully
Danga::Socket->SetPostLoopCallback(sub {
my ($dmap, undef) = @_;
foreach my $s (values %$dmap) {
if ($s->can('busy') && $s->busy) {
- $n = 1;
+ ++$n;
} else {
# close as much as possible, early as possible
$s->close;
}
}
+ if ($n) {
+ if (($warn + 5) < time) {
+ warn "$$ quitting, $n client(s) left\n";
+ $warn = time;
+ }
+ unless (defined $proc_name) {
+ $proc_name = (split(/\s+/, $0))[0];
+ $proc_name =~ s!\A.*?([^/]+)\z!$1!;
+ }
+ $0 = "$proc_name quitting, $n client(s) left";
+ }
$n; # true: loop continues, false: loop breaks
});
}
}
reopen_logs();
# main loop
+ my $quit = 0;
while (1) {
while (my $s = shift @caught) {
if ($s eq 'USR1') {
} elsif ($s eq 'USR2') {
upgrade();
} elsif ($s =~ /\A(?:QUIT|TERM|INT)\z/) {
- # drops pipes and causes children to die
- exit
+ exit if $quit++;
+ kill_workers($s);
} elsif ($s eq 'WINCH') {
$worker_processes = 0;
} elsif ($s eq 'HUP') {
}
my $n = scalar keys %pids;
+ if ($quit) {
+ exit if $n == 0;
+ $set_workers = $worker_processes = $n = 0;
+ }
+
if ($n > $worker_processes) {
while (my ($k, $v) = each %pids) {
kill('TERM', $k) if $v >= $worker_processes;
my $parent_pipe;
if ($worker_processes > 0) {
$refresh->(); # preload by default
- $parent_pipe = master_loop(); # returns if in child process
- my $fd = fileno($parent_pipe);
- Danga::Socket->AddOtherFds($fd => *worker_quit);
+ my $fh = master_loop(); # returns if in child process
+ $parent_pipe = PublicInbox::ParentPipe->new($fh, *worker_quit);
} else {
reopen_logs();
$set_user->() if $set_user;
- $SIG{USR2} = sub { worker_quit() if upgrade() };
+ $SIG{USR2} = sub { worker_quit('USR2') if upgrade() };
$refresh->();
}
$uid = $gid = undef;
--- /dev/null
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+# only for PublicInbox::Daemon
+package PublicInbox::ParentPipe;
+use strict;
+use warnings;
+use base qw(Danga::Socket);
+use fields qw(cb);
+
+sub new ($$$) {
+ my ($class, $pipe, $cb) = @_;
+ my $self = fields::new($class);
+ $self->SUPER::new($pipe);
+ $self->{cb} = $cb;
+ $self->watch_read(1);
+ $self;
+}
+
+sub event_read { $_[0]->{cb}->($_[0]) }
+
+1;
'XHDR on invalid header returns empty');
{
- syswrite($s, "HDR List-id 1-\r\n");
+ setsockopt($s, IPPROTO_TCP, TCP_NODELAY, 1);
+ syswrite($s, 'HDR List-id 1-');
+ select(undef, undef, undef, 0.15);
+ ok(kill('TERM', $pid), 'killed nntpd');
+ select(undef, undef, undef, 0.15);
+ syswrite($s, "\r\n");
$buf = '';
do {
sysread($s, $buf, 4096, length($buf));
is(scalar @r, 1, 'only one response line');
}
- ok(kill('TERM', $pid), 'killed nntpd');
- $pid = undef;
- waitpid(-1, 0);
+ is($pid, waitpid($pid, 0), 'nntpd exited successfully');
+ is($?, 0, 'no error in exited process');
}
done_testing();