summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
b2b1006)
We can get rid of the janky wannabe
self-using-a-directory-instead-of-pipe thing we needed to
workaround Filesys::Notify::Simple being blocking.
For existing Maildir users, this should be more robust and
immune to missed wakeups for signalfd and kqueue-enabled
systems; as well as being immune to BOFHs clearing $TMPDIR
and preventing notifications from firing.
The IMAP IDLE code still uses normal Perl signals, so it's still
vulnerable to missed wakeups. That will be addressed in future
commits.
use PublicInbox::Syscall qw(SFD_NONBLOCK);
require PublicInbox::Listener;
require PublicInbox::ParentPipe;
use PublicInbox::Syscall qw(SFD_NONBLOCK);
require PublicInbox::Listener;
require PublicInbox::ParentPipe;
-require PublicInbox::Sigfd;
-my ($set_user, $oldset, $newset);
+my ($set_user, $oldset);
my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
my $worker_processes = 1;
my @listeners;
my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
my $worker_processes = 1;
my @listeners;
{ SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx };
}
{ SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx };
}
-sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
-
sub daemon_prepare ($) {
my ($default_listen) = @_;
my $listener_names = {}; # sockname => IO::Handle
sub daemon_prepare ($) {
my ($default_listen) = @_;
my $listener_names = {}; # sockname => IO::Handle
- $oldset = POSIX::SigSet->new();
- $newset = POSIX::SigSet->new();
- $newset->fillset or die "fillset: $!";
- sig_setmask($newset, $oldset);
+ my $oldset = PublicInbox::Sigfd::block_signals();
@CMD = ($0, @ARGV);
my %opts = (
'l|listen=s' => \@cfg_listen,
@CMD = ($0, @ARGV);
my %opts = (
'l|listen=s' => \@cfg_listen,
};
my $sigfd = PublicInbox::Sigfd->new($sig, 0);
local %SIG = (%SIG, %$sig) if !$sigfd;
};
my $sigfd = PublicInbox::Sigfd->new($sig, 0);
local %SIG = (%SIG, %$sig) if !$sigfd;
- sig_setmask($oldset) if !$sigfd;
+ PublicInbox::restore_signals($oldset) if !$sigfd;
while (1) { # main loop
my $n = scalar keys %pids;
unless (@listeners) {
while (1) { # main loop
my $n = scalar keys %pids;
unless (@listeners) {
}
my $want = $worker_processes - 1;
if ($n <= $want) {
}
my $want = $worker_processes - 1;
if ($n <= $want) {
- sig_setmask($newset) if !$sigfd;
+ PublicInbox::Sigfd::block_signals() if !$sigfd;
for my $i ($n..$want) {
my $pid = fork;
if (!defined $pid) {
for my $i ($n..$want) {
my $pid = fork;
if (!defined $pid) {
- sig_setmask($oldset) if !$sigfd;
+ PubliInbox::Sigfd::set_sigmask($oldset) if !$sigfd;
}
if ($sigfd) { # Linux and IO::KQueue users:
}
if ($sigfd) { # Linux and IO::KQueue users:
if (!$sigfd) {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
if (!$sigfd) {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
+ PublicInbox::Sigfd::set_sigmask($oldset);
PublicInbox::DS->SetLoopTimeout(1000);
}
PublicInbox::DS->EventLoop;
PublicInbox::DS->SetLoopTimeout(1000);
}
PublicInbox::DS->EventLoop;
use parent qw(PublicInbox::DS);
use fields qw(sig); # hashref similar to %SIG, but signal numbers as keys
use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK);
use parent qw(PublicInbox::DS);
use fields qw(sig); # hashref similar to %SIG, but signal numbers as keys
use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK);
+use POSIX qw(:signal_h);
use IO::Handle ();
# returns a coderef to unblock signals if neither signalfd or kqueue
use IO::Handle ();
# returns a coderef to unblock signals if neither signalfd or kqueue
while (wait_once($_[0])) {} # non-blocking
}
while (wait_once($_[0])) {} # non-blocking
}
+sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
+
+sub block_signals () {
+ my $oldset = POSIX::SigSet->new;
+ my $newset = POSIX::SigSet->new;
+ $newset->fillset or die "fillset: $!";
+ sig_setmask($newset, $oldset);
+ $oldset;
+}
+
use warnings;
use PublicInbox::Eml;
use PublicInbox::InboxWritable;
use warnings;
use PublicInbox::Eml;
use PublicInbox::InboxWritable;
-use File::Temp 0.19 (); # 0.19 for ->newdir
use PublicInbox::Filter::Base qw(REJECT);
use PublicInbox::Spamcheck;
use PublicInbox::Filter::Base qw(REJECT);
use PublicInbox::Spamcheck;
use PublicInbox::DS qw(now);
use POSIX qw(_exit WNOHANG);
*mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
use PublicInbox::DS qw(now);
use POSIX qw(_exit WNOHANG);
*mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
imap => scalar keys %imap ? \%imap : undef,
importers => {},
opendirs => {}, # dirname => dirhandle (in progress scans)
imap => scalar keys %imap ? \%imap : undef,
importers => {},
opendirs => {}, # dirname => dirhandle (in progress scans)
+ ops => [], # 'quit', 'full'
sub quit {
my ($self) = @_;
sub quit {
my ($self) = @_;
- trigger_scan($self, 'quit') or $self->{quit} = 1;
+ $self->{quit} = 1;
+ %{$self->{opendirs}} = ();
+ _done_for_now($self);
if (my $imap_pid = $self->{-imap_pid}) {
kill('QUIT', $imap_pid);
}
if (my $imap_pid = $self->{-imap_pid}) {
kill('QUIT', $imap_pid);
}
sub watch_fs {
my ($self) = @_;
require PublicInbox::DirIdle;
sub watch_fs {
my ($self) = @_;
require PublicInbox::DirIdle;
- my $scan = File::Temp->newdir("public-inbox-watch.$$.scan.XXXXXX",
- TMPDIR => 1);
- my $scandir = $self->{scandir} = $scan->dirname;
- my $scan_re = qr!\A$scandir/!;
my $done = sub {
delete $self->{done_timer};
_done_for_now($self);
};
my $cb = sub {
my $done = sub {
delete $self->{done_timer};
_done_for_now($self);
};
my $cb = sub {
- my $path = $_[0]->fullname;
- if ($path =~ $scan_re) {
- scan($self, $path);
- } else {
- _try_path($self, $path);
- }
+ _try_path($self, $_[0]->fullname);
$self->{done_timer} //= PublicInbox::DS::requeue($done);
};
$self->{done_timer} //= PublicInbox::DS::requeue($done);
};
- my $di = PublicInbox::DirIdle->new([@{$self->{mdir}}, $scandir], $cb);
+ my $di = PublicInbox::DirIdle->new($self->{mdir}, $cb);
PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} });
PublicInbox::DS->EventLoop;
_done_for_now($self);
PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} });
PublicInbox::DS->EventLoop;
_done_for_now($self);
+sub watch_atfork_child ($) {
+ my ($self) = @_;
+ PublicInbox::Sigfd::sig_setmask($self->{oldset});
+ %SIG = (%SIG, %{$self->{sig}});
+}
+
sub watch_imap_idle_all ($$) {
my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]]
$self->{mics} = {}; # going to be forking, so disconnect
sub watch_imap_idle_all ($$) {
my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]]
$self->{mics} = {}; # going to be forking, so disconnect
my ($uri, $intvl) = @$uri_intvl;
defined(my $pid = fork) or die "fork: $!";
if ($pid == 0) {
my ($uri, $intvl) = @$uri_intvl;
defined(my $pid = fork) or die "fork: $!";
if ($pid == 0) {
+ watch_atfork_child($self);
delete $self->{idle_pids};
watch_imap_idle_1($self, $uri, $intvl);
_exit(0);
delete $self->{idle_pids};
watch_imap_idle_1($self, $uri, $intvl);
_exit(0);
+ my ($self, $sig, $oldset) = @_;
+ $self->{oldset} = $oldset;
+ $self->{sig} = $sig;
if ($self->{mdre} && $self->{imap}) {
defined(my $pid = fork) or die "fork: $!";
if ($pid == 0) {
if ($self->{mdre} && $self->{imap}) {
defined(my $pid = fork) or die "fork: $!";
if ($pid == 0) {
+ watch_atfork_child($self);
imap_start($self);
goto &watch_imap;
}
$self->{-imap_pid} = $pid;
} elsif ($self->{imap}) {
imap_start($self);
goto &watch_imap;
}
$self->{-imap_pid} = $pid;
} elsif ($self->{imap}) {
+ # not a child process, but no signalfd, yet:
+ watch_atfork_child($self);
imap_start($self);
goto &watch_imap;
}
imap_start($self);
goto &watch_imap;
}
- my ($self, $base) = @_;
- my $dir = $self->{scandir} or return;
- open my $fh, '>', "$dir/$base" or die "open $dir/$base failed: $!\n";
- close $fh or die "close $dir/$base failed: $!\n";
+ my ($self, $op) = @_;
+ push @{$self->{ops}}, $op;
+ PublicInbox::DS::requeue($self);
-sub scan {
- my ($self, $path) = @_;
- if ($path =~ /quit\z/) {
- %{$self->{opendirs}} = ();
- _done_for_now($self);
- delete $self->{scandir};
- $self->{quit} = 1;
- return;
- }
- # else: $path =~ /(cont|full)\z/
+# called directly, and by PublicInbox::DS
+sub event_step ($) {
+ my ($self) = @_;
+ my $op = shift @{$self->{ops}};
+
+ # continue existing scan
my $max = 10;
my $opendirs = $self->{opendirs};
my @dirnames = keys %$opendirs;
my $max = 10;
my $opendirs = $self->{opendirs};
my @dirnames = keys %$opendirs;
}
$opendirs->{$dir} = $dh if $n < 0;
}
}
$opendirs->{$dir} = $dh if $n < 0;
}
- if ($path =~ /full\z/) {
+ if ($op && $op eq 'full') {
foreach my $dir (@{$self->{mdir}}) {
next if $opendirs->{$dir}; # already in progress
my $ok = opendir(my $dh, $dir);
foreach my $dir (@{$self->{mdir}}) {
next if $opendirs->{$dir}; # already in progress
my $ok = opendir(my $dh, $dir);
}
_done_for_now($self);
# do we have more work to do?
}
_done_for_now($self);
# do we have more work to do?
- trigger_scan($self, 'cont') if keys %$opendirs;
+ PublicInbox::DS::requeue($self) if keys %$opendirs;
+}
+
+sub scan {
+ my ($self, $op) = @_;
+ push @{$self->{ops}}, $op;
+ goto &event_step;
use warnings;
use PublicInbox::WatchMaildir;
use PublicInbox::Config;
use warnings;
use PublicInbox::WatchMaildir;
use PublicInbox::Config;
+use PublicInbox::DS;
+use PublicInbox::Sigfd;
+use PublicInbox::Syscall qw(SFD_NONBLOCK);
+my $oldset = PublicInbox::Sigfd::block_signals();
my ($config, $watch_md);
my $reload = sub {
$config = PublicInbox::Config->new;
my ($config, $watch_md);
my $reload = sub {
$config = PublicInbox::Config->new;
$reload->();
if ($watch_md) {
my $scan = sub { $watch_md->trigger_scan('full') if $watch_md };
$reload->();
if ($watch_md) {
my $scan = sub { $watch_md->trigger_scan('full') if $watch_md };
- $SIG{HUP} = $reload;
- $SIG{USR1} = $scan;
- $SIG{ALRM} = sub { $SIG{ALRM} = 'DEFAULT'; $scan->() };
- $SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub {
$watch_md->quit if $watch_md;
$watch_md = undef;
};
$watch_md->quit if $watch_md;
$watch_md = undef;
};
+ my $sig = { HUP => $reload, USR1 => $scan };
+ $sig->{QUIT} = $sig->{TERM} = $sig->{INT} = $quit;
+
# --no-scan is only intended for testing atm, undocumented.
# --no-scan is only intended for testing atm, undocumented.
- alarm(1) unless (grep(/\A--no-scan\z/, @ARGV));
- $watch_md->watch while ($watch_md);
+ unless (grep(/\A--no-scan\z/, @ARGV)) {
+ PublicInbox::DS::requeue($scan);
+ }
+ my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
+ local %SIG = (%SIG, %$sig) if !$sigfd;
+ if (!$sigfd) {
+ PublicInbox::Sigfd::set_sigmask($oldset);
+ PublicInbox::DS->SetLoopTimeout(1000);
+ }
+ $watch_md->watch($sig, $oldset) while ($watch_md);
my $ino_fdinfo = "/proc/$wm->{pid}/fdinfo/$ino_fd";
while (time < $end && open(my $fh, '<', $ino_fdinfo)) {
@ino_info = grep(/^inotify wd:/, <$fh>);
my $ino_fdinfo = "/proc/$wm->{pid}/fdinfo/$ino_fd";
while (time < $end && open(my $fh, '<', $ino_fdinfo)) {
@ino_info = grep(/^inotify wd:/, <$fh>);
- last if @ino_info >= 4;
+ last if @ino_info >= 3;
- $sleep = undef if @ino_info >= 4;
+ $sleep = undef if @ino_info >= 3;