This lets us call dwaitpid long before a process exits
and not have to wait around for it.
This is advantageous for lei where we can run dwaitpid on the
pager as soon as we spawn it, instead of waiting for a client
socket to go away on DESTROY.
use v5.10.1;
use parent qw(Exporter);
use bytes;
use v5.10.1;
use parent qw(Exporter);
use bytes;
+use POSIX qw(WNOHANG sigprocmask SIG_SETMASK);
use IO::Handle qw();
use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use IO::Handle qw();
use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout;
}
($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout;
}
+sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
+
+sub block_signals () {
+ my $oldset = POSIX::SigSet->new;
+ my $newset = POSIX::SigSet->new;
+ $newset->fillset or die "fillset: $!";
+ sig_setmask($newset, $oldset);
+ $oldset;
+}
+
# We can't use waitpid(-1) safely here since it can hit ``, system(),
# and other things. So we scan the $wait_pids list, which is hopefully
# not too big. We keep $wait_pids small by not calling dwaitpid()
# We can't use waitpid(-1) safely here since it can hit ``, system(),
# and other things. So we scan the $wait_pids list, which is hopefully
# not too big. We keep $wait_pids small by not calling dwaitpid()
$reap_armed = undef;
my $tmp = $wait_pids or return;
$wait_pids = undef;
$reap_armed = undef;
my $tmp = $wait_pids or return;
$wait_pids = undef;
+ my $oldset = block_signals();
foreach my $ary (@$tmp) {
my ($pid, $cb, $arg) = @$ary;
my $ret = waitpid($pid, WNOHANG);
foreach my $ary (@$tmp) {
my ($pid, $cb, $arg) = @$ary;
my $ret = waitpid($pid, WNOHANG);
warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
}
}
warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
}
}
- # we may not be done, yet, and could've missed/masked a SIGCHLD:
- $reap_armed //= requeue(\&reap_pids) if $wait_pids;
}
# reentrant SIGCHLD handler (since reap_pids is not reentrant)
}
# reentrant SIGCHLD handler (since reap_pids is not reentrant)
sub daemon_prepare ($) {
my ($default_listen) = @_;
my $listener_names = {}; # sockname => IO::Handle
sub daemon_prepare ($) {
my ($default_listen) = @_;
my $listener_names = {}; # sockname => IO::Handle
- $oldset = PublicInbox::Sigfd::block_signals();
+ $oldset = PublicInbox::DS::block_signals();
@CMD = ($0, @ARGV);
my ($prog) = ($CMD[0] =~ m!([^/]+)\z!g);
my $help = <<EOF;
@CMD = ($0, @ARGV);
my ($prog) = ($CMD[0] =~ m!([^/]+)\z!g);
my $help = <<EOF;
};
my $sigfd = PublicInbox::Sigfd->new($sig, 0);
local %SIG = (%SIG, %$sig) if !$sigfd;
};
my $sigfd = PublicInbox::Sigfd->new($sig, 0);
local %SIG = (%SIG, %$sig) if !$sigfd;
- PublicInbox::Sigfd::sig_setmask($oldset) if !$sigfd;
+ PublicInbox::DS::sig_setmask($oldset) if !$sigfd;
while (1) { # main loop
my $n = scalar keys %pids;
unless (@listeners) {
while (1) { # main loop
my $n = scalar keys %pids;
unless (@listeners) {
}
my $want = $worker_processes - 1;
if ($n <= $want) {
}
my $want = $worker_processes - 1;
if ($n <= $want) {
- PublicInbox::Sigfd::block_signals() if !$sigfd;
+ PublicInbox::DS::block_signals() if !$sigfd;
for my $i ($n..$want) {
my $pid = fork;
if (!defined $pid) {
for my $i ($n..$want) {
my $pid = fork;
if (!defined $pid) {
- PublicInbox::Sigfd::sig_setmask($oldset) if !$sigfd;
+ PublicInbox::DS::sig_setmask($oldset) if !$sigfd;
}
if ($sigfd) { # Linux and IO::KQueue users:
}
if ($sigfd) { # Linux and IO::KQueue users:
if (!$sigfd) {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
if (!$sigfd) {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
- PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS::sig_setmask($oldset);
PublicInbox::DS->SetLoopTimeout(1000);
}
PublicInbox::DS->EventLoop;
PublicInbox::DS->SetLoopTimeout(1000);
}
PublicInbox::DS->EventLoop;
$pr->("performing initial scan ...\n") if $pr;
my $sync = eidx_sync($self, $opt); # initial sync
return if $sync->{quit};
$pr->("performing initial scan ...\n") if $pr;
my $sync = eidx_sync($self, $opt); # initial sync
return if $sync->{quit};
- my $oldset = PublicInbox::Sigfd::block_signals();
+ my $oldset = PublicInbox::DS::block_signals();
local $self->{current_info} = '';
my $cb = $SIG{__WARN__} || \&CORE::warn;
local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
local $self->{current_info} = '';
my $cb = $SIG{__WARN__} || \&CORE::warn;
local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
if (!$sigfd) {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
if (!$sigfd) {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
- PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS::sig_setmask($oldset);
PublicInbox::DS->SetLoopTimeout(1000);
}
PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
PublicInbox::DS->SetLoopTimeout(1000);
}
PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
pipe(my ($r_req, $w_req)) or die "pipe: $!";
pipe(my ($r_res, $w_res)) or die "pipe: $!";
delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
pipe(my ($r_req, $w_req)) or die "pipe: $!";
pipe(my ($r_res, $w_res)) or die "pipe: $!";
- my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+ my $sigset = $oldset // PublicInbox::DS::block_signals();
my $parent = $$;
$self->ipc_atfork_parent;
defined(my $pid = fork) or die "fork: $!";
my $parent = $$;
$self->ipc_atfork_parent;
defined(my $pid = fork) or die "fork: $!";
$w_res->autoflush(1);
$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
local $0 = $ident;
$w_res->autoflush(1);
$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
local $0 = $ident;
- PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS::sig_setmask($oldset);
my $on_destroy = $self->ipc_atfork_child;
eval { ipc_worker_loop($self, $r_req, $w_res) };
die "worker $ident PID:$$ died: $@\n" if $@;
exit;
}
my $on_destroy = $self->ipc_atfork_child;
eval { ipc_worker_loop($self, $r_req, $w_res) };
die "worker $ident PID:$$ died: $@\n" if $@;
exit;
}
- PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+ PublicInbox::DS::sig_setmask($sigset) unless $oldset;
$r_req = $w_res = undef;
$w_req->autoflush(1);
$self->{-ipc_req} = $w_req;
$r_req = $w_res = undef;
$w_req->autoflush(1);
$self->{-ipc_req} = $w_req;
my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
$self->{1} = $w;
$self->{2} = $w if -t $self->{2};
my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
$self->{1} = $w;
$self->{2} = $w if -t $self->{2};
- $self->{'pager.pid'} = spawn([$pager], $env, $rdr);
+ my $pid = spawn([$pager], $env, $rdr);
+ dwaitpid($pid, undef, $self->{sock});
$env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
}
$env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
}
my @st = stat($path) or die "stat($path): $!";
my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
my @st = stat($path) or die "stat($path): $!";
my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
- my $oldset = PublicInbox::Sigfd::block_signals();
+ my $oldset = PublicInbox::DS::block_signals();
if ($nfd == 1) {
require IO::FDPass;
$recv_3fds = sub { map { IO::FDPass::recv($_[0]) } (0..2) };
if ($nfd == 1) {
require IO::FDPass;
$recv_3fds = sub { map { IO::FDPass::recv($_[0]) } (0..2) };
} else {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
} else {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
- PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS::sig_setmask($oldset);
PublicInbox::DS->SetLoopTimeout(1000);
}
PublicInbox::DS->SetPostLoopCallback(sub {
PublicInbox::DS->SetLoopTimeout(1000);
}
PublicInbox::DS->SetPostLoopCallback(sub {
sub DESTROY {
my ($self) = @_;
$self->{1}->autoflush(1);
sub DESTROY {
my ($self) = @_;
$self->{1}->autoflush(1);
- if (my $pid = delete $self->{'pager.pid'}) {
- dwaitpid($pid, undef, $self->{sock});
- }
use strict;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK);
use strict;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK);
-use POSIX qw(:signal_h);
use IO::Handle ();
# returns a coderef to unblock signals if neither signalfd or kqueue
use IO::Handle ();
# returns a coderef to unblock signals if neither signalfd or kqueue
while (wait_once($_[0])) {} # non-blocking
}
while (wait_once($_[0])) {} # non-blocking
}
-sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
-
-sub block_signals () {
- my $oldset = POSIX::SigSet->new;
- my $newset = POSIX::SigSet->new;
- $newset->fillset or die "fillset: $!";
- sig_setmask($newset, $oldset);
- $oldset;
-}
-
delete $self->{opendirs};
PublicInbox::DS->Reset;
%SIG = (%SIG, %{$self->{sig}}, CHLD => 'DEFAULT');
delete $self->{opendirs};
PublicInbox::DS->Reset;
%SIG = (%SIG, %{$self->{sig}}, CHLD => 'DEFAULT');
- PublicInbox::Sigfd::sig_setmask($self->{oldset});
+ PublicInbox::DS::sig_setmask($self->{oldset});
}
sub watch_atfork_parent ($) {
my ($self) = @_;
_done_for_now($self);
}
sub watch_atfork_parent ($) {
my ($self) = @_;
_done_for_now($self);
- PublicInbox::Sigfd::block_signals();
+ PublicInbox::DS::block_signals();
}
sub imap_idle_requeue ($) { # DS::add_timer callback
}
sub imap_idle_requeue ($) { # DS::add_timer callback
imap_idle_fork($self, $url_intvl);
}
};
imap_idle_fork($self, $url_intvl);
}
};
- PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS::sig_setmask($oldset);
die $@ if $@;
}
fs_scan_step($self) if $self->{mdre};
die $@ if $@;
}
fs_scan_step($self) if $self->{mdre};
- PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS::sig_setmask($oldset);
die "fork: $!" unless defined $pid;
$self->{poll_pids}->{$pid} = [ $intvl, $urls ];
PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
die "fork: $!" unless defined $pid;
$self->{poll_pids}->{$pid} = [ $intvl, $urls ];
PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
GetOptions('scan!' => \$do_scan, # undocumented, testing only
'help|h' => \(my $show_help)) or do { print STDERR $help; exit 1 };
if ($show_help) { print $help; exit 0 };
GetOptions('scan!' => \$do_scan, # undocumented, testing only
'help|h' => \(my $show_help)) or do { print STDERR $help; exit 1 };
if ($show_help) { print $help; exit 0 };
-my $oldset = PublicInbox::Sigfd::block_signals();
+my $oldset = PublicInbox::DS::block_signals();
STDOUT->autoflush(1);
STDERR->autoflush(1);
local $0 = $0; # local since this script may be eval-ed
STDOUT->autoflush(1);
STDERR->autoflush(1);
local $0 = $0; # local since this script may be eval-ed
my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
local %SIG = (%SIG, %$sig) if !$sigfd;
if (!$sigfd) {
my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
local %SIG = (%SIG, %$sig) if !$sigfd;
if (!$sigfd) {
- PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS::sig_setmask($oldset);
PublicInbox::DS->SetLoopTimeout(1000);
}
$watch->watch($sig, $oldset) while ($watch);
PublicInbox::DS->SetLoopTimeout(1000);
}
$watch->watch($sig, $oldset) while ($watch);
select(undef, undef, undef, 0.01) while 1;
}
EOF
select(undef, undef, undef, 0.01) while 1;
}
EOF
- my $oldset = PublicInbox::Sigfd::block_signals();
+ my $oldset = PublicInbox::DS::block_signals();
my $rd = popen_rd([$^X, '-e', $script]);
diag 'waiting for child to reap grandchild...';
chomp(my $line = readline($rd));
my $rd = popen_rd([$^X, '-e', $script]);
diag 'waiting for child to reap grandchild...';
chomp(my $line = readline($rd));
ok(kill('CHLD', $pid), 'sent SIGCHLD to child');
is(readline($rd), "HI\n", '$SIG{CHLD} works in child');
ok(close $rd, 'popen_rd close works');
ok(kill('CHLD', $pid), 'sent SIGCHLD to child');
is(readline($rd), "HI\n", '$SIG{CHLD} works in child');
ok(close $rd, 'popen_rd close works');
- PublicInbox::Sigfd::sig_setmask($oldset);
+ PublicInbox::DS::sig_setmask($oldset);