]> Sergey Matveev's repositories - public-inbox.git/commitdiff
watch: use signalfd for Maildir watching
authorEric Wong <e@yhbt.net>
Sat, 27 Jun 2020 10:03:37 +0000 (10:03 +0000)
committerEric Wong <e@yhbt.net>
Sun, 28 Jun 2020 22:27:14 +0000 (22:27 +0000)
We can get rid of the janky wannabe
self-using-a-directory-instead-of-pipe thing we needed to
workaround Filesys::Notify::Simple being blocking.

For existing Maildir users, this should be more robust and
immune to missed wakeups for signalfd and kqueue-enabled
systems; as well as being immune to BOFHs clearing $TMPDIR
and preventing notifications from firing.

The IMAP IDLE code still uses normal Perl signals, so it's still
vulnerable to missed wakeups.  That will be addressed in future
commits.

lib/PublicInbox/Daemon.pm
lib/PublicInbox/Sigfd.pm
lib/PublicInbox/WatchMaildir.pm
script/public-inbox-watch
t/watch_maildir.t

index 2f63bd73b4a2ff0dcd6ae4e8618775f76f4315b5..ab0c2226e407037dd5889d0b8a3d271c01da7020 100644 (file)
@@ -18,9 +18,9 @@ use PublicInbox::DS qw(now);
 use PublicInbox::Syscall qw(SFD_NONBLOCK);
 require PublicInbox::Listener;
 require PublicInbox::ParentPipe;
-require PublicInbox::Sigfd;
+use PublicInbox::Sigfd;
 my @CMD;
-my ($set_user, $oldset, $newset);
+my ($set_user, $oldset);
 my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
 my $worker_processes = 1;
 my @listeners;
@@ -72,15 +72,10 @@ sub accept_tls_opt ($) {
        { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx };
 }
 
-sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
-
 sub daemon_prepare ($) {
        my ($default_listen) = @_;
        my $listener_names = {}; # sockname => IO::Handle
-       $oldset = POSIX::SigSet->new();
-       $newset = POSIX::SigSet->new();
-       $newset->fillset or die "fillset: $!";
-       sig_setmask($newset, $oldset);
+       my $oldset = PublicInbox::Sigfd::block_signals();
        @CMD = ($0, @ARGV);
        my %opts = (
                'l|listen=s' => \@cfg_listen,
@@ -515,7 +510,7 @@ EOF
        };
        my $sigfd = PublicInbox::Sigfd->new($sig, 0);
        local %SIG = (%SIG, %$sig) if !$sigfd;
-       sig_setmask($oldset) if !$sigfd;
+       PublicInbox::restore_signals($oldset) if !$sigfd;
        while (1) { # main loop
                my $n = scalar keys %pids;
                unless (@listeners) {
@@ -531,7 +526,7 @@ EOF
                }
                my $want = $worker_processes - 1;
                if ($n <= $want) {
-                       sig_setmask($newset) if !$sigfd;
+                       PublicInbox::Sigfd::block_signals() if !$sigfd;
                        for my $i ($n..$want) {
                                my $pid = fork;
                                if (!defined $pid) {
@@ -544,7 +539,7 @@ EOF
                                        $pids{$pid} = $i;
                                }
                        }
-                       sig_setmask($oldset) if !$sigfd;
+                       PubliInbox::Sigfd::set_sigmask($oldset) if !$sigfd;
                }
 
                if ($sigfd) { # Linux and IO::KQueue users:
@@ -632,7 +627,7 @@ sub daemon_loop ($$$$) {
        if (!$sigfd) {
                # wake up every second to accept signals if we don't
                # have signalfd or IO::KQueue:
-               sig_setmask($oldset);
+               PublicInbox::Sigfd::set_sigmask($oldset);
                PublicInbox::DS->SetLoopTimeout(1000);
        }
        PublicInbox::DS->EventLoop;
index f500902ea67c8fc533922b0de455ccad679d7ce9..17456592a7e52718d9ca595b282962f53c898ea3 100644 (file)
@@ -5,7 +5,7 @@ use strict;
 use parent qw(PublicInbox::DS);
 use fields qw(sig); # hashref similar to %SIG, but signal numbers as keys
 use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK);
-use POSIX ();
+use POSIX qw(:signal_h);
 use IO::Handle ();
 
 # returns a coderef to unblock signals if neither signalfd or kqueue
@@ -62,4 +62,14 @@ sub event_step {
        while (wait_once($_[0])) {} # non-blocking
 }
 
+sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
+
+sub block_signals () {
+       my $oldset = POSIX::SigSet->new;
+       my $newset = POSIX::SigSet->new;
+       $newset->fillset or die "fillset: $!";
+       sig_setmask($newset, $oldset);
+       $oldset;
+}
+
 1;
index 22f190366a4696f7ead6cb9ffa2b2346d0001ca9..4d3cd032e5a5664fb5f41fee42cd68194655fc97 100644 (file)
@@ -8,9 +8,9 @@ use strict;
 use warnings;
 use PublicInbox::Eml;
 use PublicInbox::InboxWritable;
-use File::Temp 0.19 (); # 0.19 for ->newdir
 use PublicInbox::Filter::Base qw(REJECT);
 use PublicInbox::Spamcheck;
+use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now);
 use POSIX qw(_exit WNOHANG);
 *mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
@@ -108,6 +108,7 @@ sub new {
                imap => scalar keys %imap ? \%imap : undef,
                importers => {},
                opendirs => {}, # dirname => dirhandle (in progress scans)
+               ops => [], # 'quit', 'full'
        }, $class;
 }
 
@@ -195,7 +196,9 @@ sub _try_path {
 
 sub quit {
        my ($self) = @_;
-       trigger_scan($self, 'quit') or $self->{quit} = 1;
+       $self->{quit} = 1;
+       %{$self->{opendirs}} = ();
+       _done_for_now($self);
        if (my $imap_pid = $self->{-imap_pid}) {
                kill('QUIT', $imap_pid);
        }
@@ -213,24 +216,15 @@ sub quit {
 sub watch_fs {
        my ($self) = @_;
        require PublicInbox::DirIdle;
-       my $scan = File::Temp->newdir("public-inbox-watch.$$.scan.XXXXXX",
-                                       TMPDIR => 1);
-       my $scandir = $self->{scandir} = $scan->dirname;
-       my $scan_re = qr!\A$scandir/!;
        my $done = sub {
                delete $self->{done_timer};
                _done_for_now($self);
        };
        my $cb = sub {
-               my $path = $_[0]->fullname;
-               if ($path =~ $scan_re) {
-                       scan($self, $path);
-               } else {
-                       _try_path($self, $path);
-               }
+               _try_path($self, $_[0]->fullname);
                $self->{done_timer} //= PublicInbox::DS::requeue($done);
        };
-       my $di = PublicInbox::DirIdle->new([@{$self->{mdir}}, $scandir], $cb);
+       my $di = PublicInbox::DirIdle->new($self->{mdir}, $cb);
        PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} });
        PublicInbox::DS->EventLoop;
        _done_for_now($self);
@@ -485,6 +479,12 @@ sub watch_imap_idle_1 ($$$) {
        }
 }
 
+sub watch_atfork_child ($) {
+       my ($self) = @_;
+       PublicInbox::Sigfd::sig_setmask($self->{oldset});
+       %SIG = (%SIG, %{$self->{sig}});
+}
+
 sub watch_imap_idle_all ($$) {
        my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]]
        $self->{mics} = {}; # going to be forking, so disconnect
@@ -494,6 +494,7 @@ sub watch_imap_idle_all ($$) {
                        my ($uri, $intvl) = @$uri_intvl;
                        defined(my $pid = fork) or die "fork: $!";
                        if ($pid == 0) {
+                               watch_atfork_child($self);
                                delete $self->{idle_pids};
                                watch_imap_idle_1($self, $uri, $intvl);
                                _exit(0);
@@ -564,15 +565,20 @@ sub watch_imap ($) {
 }
 
 sub watch {
-       my ($self) = @_;
+       my ($self, $sig, $oldset) = @_;
+       $self->{oldset} = $oldset;
+       $self->{sig} = $sig;
        if ($self->{mdre} && $self->{imap}) {
                defined(my $pid = fork) or die "fork: $!";
                if ($pid == 0) {
+                       watch_atfork_child($self);
                        imap_start($self);
                        goto &watch_imap;
                }
                $self->{-imap_pid} = $pid;
        } elsif ($self->{imap}) {
+               # not a child process, but no signalfd, yet:
+               watch_atfork_child($self);
                imap_start($self);
                goto &watch_imap;
        }
@@ -580,23 +586,18 @@ sub watch {
 }
 
 sub trigger_scan {
-       my ($self, $base) = @_;
-       my $dir = $self->{scandir} or return;
-       open my $fh, '>', "$dir/$base" or die "open $dir/$base failed: $!\n";
-       close $fh or die "close $dir/$base failed: $!\n";
+       my ($self, $op) = @_;
+       push @{$self->{ops}}, $op;
+       PublicInbox::DS::requeue($self);
 }
 
-sub scan {
-       my ($self, $path) = @_;
-       if ($path =~ /quit\z/) {
-               %{$self->{opendirs}} = ();
-               _done_for_now($self);
-               delete $self->{scandir};
-               $self->{quit} = 1;
-               return;
-       }
-       # else: $path =~ /(cont|full)\z/
+# called directly, and by PublicInbox::DS
+sub event_step ($) {
+       my ($self) = @_;
        return if $self->{quit};
+       my $op = shift @{$self->{ops}};
+
+       # continue existing scan
        my $max = 10;
        my $opendirs = $self->{opendirs};
        my @dirnames = keys %$opendirs;
@@ -609,7 +610,7 @@ sub scan {
                }
                $opendirs->{$dir} = $dh if $n < 0;
        }
-       if ($path =~ /full\z/) {
+       if ($op && $op eq 'full') {
                foreach my $dir (@{$self->{mdir}}) {
                        next if $opendirs->{$dir}; # already in progress
                        my $ok = opendir(my $dh, $dir);
@@ -627,7 +628,13 @@ sub scan {
        }
        _done_for_now($self);
        # do we have more work to do?
-       trigger_scan($self, 'cont') if keys %$opendirs;
+       PublicInbox::DS::requeue($self) if keys %$opendirs;
+}
+
+sub scan {
+       my ($self, $op) = @_;
+       push @{$self->{ops}}, $op;
+       goto &event_step;
 }
 
 sub _importer_for {
index 2057066a2a9038589120d7ed424550aef44b1444..b6d545adad782c81cecb92c49e3dbb7578575546 100755 (executable)
@@ -5,6 +5,10 @@ use strict;
 use warnings;
 use PublicInbox::WatchMaildir;
 use PublicInbox::Config;
+use PublicInbox::DS;
+use PublicInbox::Sigfd;
+use PublicInbox::Syscall qw(SFD_NONBLOCK);
+my $oldset = PublicInbox::Sigfd::block_signals();
 my ($config, $watch_md);
 my $reload = sub {
        $config = PublicInbox::Config->new;
@@ -14,14 +18,22 @@ my $reload = sub {
 $reload->();
 if ($watch_md) {
        my $scan = sub { $watch_md->trigger_scan('full') if $watch_md };
-       $SIG{HUP} = $reload;
-       $SIG{USR1} = $scan;
-       $SIG{ALRM} = sub { $SIG{ALRM} = 'DEFAULT'; $scan->() };
-       $SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub {
+       my $quit = sub {
                $watch_md->quit if $watch_md;
                $watch_md = undef;
        };
+       my $sig = { HUP => $reload, USR1 => $scan };
+       $sig->{QUIT} = $sig->{TERM} = $sig->{INT} = $quit;
+
        # --no-scan is only intended for testing atm, undocumented.
-       alarm(1) unless (grep(/\A--no-scan\z/, @ARGV));
-       $watch_md->watch while ($watch_md);
+       unless (grep(/\A--no-scan\z/, @ARGV)) {
+               PublicInbox::DS::requeue($scan);
+       }
+       my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
+       local %SIG = (%SIG, %$sig) if !$sigfd;
+       if (!$sigfd) {
+               PublicInbox::Sigfd::set_sigmask($oldset);
+               PublicInbox::DS->SetLoopTimeout(1000);
+       }
+       $watch_md->watch($sig, $oldset) while ($watch_md);
 }
index a2c09b0351b9ed4df55e0b56d73f209378554e1c..c8658140cf2de1ed6d97c2816f54cfcbd4e49617 100644 (file)
@@ -184,10 +184,10 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
                        my $ino_fdinfo = "/proc/$wm->{pid}/fdinfo/$ino_fd";
                        while (time < $end && open(my $fh, '<', $ino_fdinfo)) {
                                @ino_info = grep(/^inotify wd:/, <$fh>);
-                               last if @ino_info >= 4;
+                               last if @ino_info >= 3;
                                tick;
                        }
-                       $sleep = undef if @ino_info >= 4;
+                       $sleep = undef if @ino_info >= 3;
                }
        }
        if ($sleep) {