]> Sergey Matveev's repositories - public-inbox.git/commitdiff
watch: wire up IMAP IDLE reapers to DS
authorEric Wong <e@yhbt.net>
Sat, 27 Jun 2020 10:03:39 +0000 (10:03 +0000)
committerEric Wong <e@yhbt.net>
Sun, 28 Jun 2020 22:27:16 +0000 (22:27 +0000)
We can avoid synchronous `waitpid(-1, 0)' and save a process
when simultaneously watching Maildirs.

One DS bug is fixed: ->Reset needs to clear the DS $in_loop flag
in forked children so dwaitpid() fails and allows git processes
to be reaped synchronously.  TestCommon also calls DS->Reset
when spawning new processes, since t/imapd.t uses DS->EventLoop
while waiting on -watch to write.

lib/PublicInbox/DS.pm
lib/PublicInbox/TestCommon.pm
lib/PublicInbox/WatchMaildir.pm
script/public-inbox-watch

index da68802dda94a82da154e156207442b4641bdd99..c46b20cba278e33de38b7e3adf4b8e59d3582afc 100644 (file)
@@ -68,7 +68,7 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
-    $wait_pids = $later_queue = undef;
+    $in_loop = $wait_pids = $later_queue = undef;
     $EXPMAP = {};
     $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef;
     $LoopTimeout = -1;  # no timeout by default
index b252810fca51865903e332aaecd2ef4b4f21d2a0..14ebba10563fc3abd5e22146b89b64fd3e5978e0 100644 (file)
@@ -350,6 +350,7 @@ sub start_script {
        }
        defined(my $pid = fork) or die "fork: $!\n";
        if ($pid == 0) {
+               eval { PublicInbox::DS->Reset };
                # pretend to be systemd (cf. sd_listen_fds(3))
                # 3 == SD_LISTEN_FDS_START
                my $fd;
index 4d3cd032e5a5664fb5f41fee42cd68194655fc97..431350be277f881f9a2c5e6a22f4c10ce88c6bb2 100644 (file)
@@ -12,7 +12,7 @@ use PublicInbox::Filter::Base qw(REJECT);
 use PublicInbox::Spamcheck;
 use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now);
-use POSIX qw(_exit WNOHANG);
+use POSIX qw(_exit);
 *mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
 
 sub compile_watchheaders ($) {
@@ -213,9 +213,8 @@ sub quit {
        }
 }
 
-sub watch_fs {
+sub watch_fs_init ($) {
        my ($self) = @_;
-       require PublicInbox::DirIdle;
        my $done = sub {
                delete $self->{done_timer};
                _done_for_now($self);
@@ -224,10 +223,8 @@ sub watch_fs {
                _try_path($self, $_[0]->fullname);
                $self->{done_timer} //= PublicInbox::DS::requeue($done);
        };
-       my $di = PublicInbox::DirIdle->new($self->{mdir}, $cb);
-       PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} });
-       PublicInbox::DS->EventLoop;
-       _done_for_now($self);
+       require PublicInbox::DirIdle;
+       PublicInbox::DirIdle->new($self->{mdir}, $cb); # EPOLL_CTL_ADD
 }
 
 # returns the git config section name, e.g [imap "imaps://user@example.com"]
@@ -334,25 +331,6 @@ sub mic_for ($$$) { # mic = Mail::IMAPClient
        $mic;
 }
 
-sub imap_start ($) {
-       my ($self) = @_;
-       eval { require PublicInbox::IMAPClient } or
-               die "Mail::IMAPClient is required for IMAP:\n$@\n";
-       eval { require Git } or
-               die "Git (Perl module) is required for IMAP:\n$@\n";
-       eval { require PublicInbox::IMAPTracker } or
-               die "DBD::SQLite is required for IMAP\n:$@\n";
-
-       my $mic_args = imap_common_init($self);
-       # make sure we can connect and cache the credentials in memory
-       $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
-       my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
-       for my $url (sort keys %{$self->{imap}}) {
-               my $uri = PublicInbox::URIimap->new($url);
-               $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
-       }
-}
-
 sub imap_fetch_all ($$$) {
        my ($self, $mic, $uri) = @_;
        my $sec = imap_section($uri);
@@ -481,74 +459,76 @@ sub watch_imap_idle_1 ($$$) {
 
 sub watch_atfork_child ($) {
        my ($self) = @_;
+       delete $self->{idle_pids};
+       PublicInbox::DS->Reset;
        PublicInbox::Sigfd::sig_setmask($self->{oldset});
        %SIG = (%SIG, %{$self->{sig}});
 }
 
-sub watch_imap_idle_all ($$) {
-       my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]]
-       $self->{mics} = {}; # going to be forking, so disconnect
-       my $idle_pids = $self->{idle_pids} = {};
-       until ($self->{quit}) {
-               while (my $uri_intvl = shift @$idle) {
-                       my ($uri, $intvl) = @$uri_intvl;
-                       defined(my $pid = fork) or die "fork: $!";
-                       if ($pid == 0) {
-                               watch_atfork_child($self);
-                               delete $self->{idle_pids};
-                               watch_imap_idle_1($self, $uri, $intvl);
-                               _exit(0);
-                       }
-                       $idle_pids->{$pid} = $uri_intvl;
-               }
-               my $pid = waitpid(-1, 0) or next;
-               if ($pid < 0) {
-                       warn "W: no idling children: $!";
-                       if (@$idle) {
-                               sleep 60;
-                       } else {
-                               warn "W: nothing to respawn, quitting IDLE\n";
-                               last;
-                       }
-               }
-               if (my $uri_intvl = delete $idle_pids->{$pid}) {
-                       my ($uri, $intvl) = @$uri_intvl;
-                       my $url = $uri->as_string;
-                       if ($? || !$self->{quit}) {
-                               warn "W: PID=$pid on $url died: \$?=$?\n";
-                       }
-                       push @$idle, $uri_intvl;
-               } else {
-                       warn "W: PID=$pid (unknown) reaped: \$?=$?\n";
-               }
+sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
+       my ($self, $pid) = @_;
+       my $uri_intvl = delete $self->{idle_pids}->{$pid} or
+               die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+
+       my ($uri, $intvl) = @$uri_intvl;
+       my $url = $uri->as_string;
+       return if $self->{quit};
+       warn "W: PID=$pid on $url died: \$?=$?\n" if $?;
+       push @{$self->{idle_todo}}, $uri_intvl;
+       PubicInbox::DS::requeue($self); # call ->event_step to respawn
+}
+
+sub imap_idle_fork ($$) {
+       my ($self, $uri_intvl) = @_;
+       my ($uri, $intvl) = @$uri_intvl;
+       defined(my $pid = fork) or die "fork: $!";
+       if ($pid == 0) {
+               watch_atfork_child($self);
+               watch_imap_idle_1($self, $uri, $intvl);
+               _exit(0);
        }
+       $self->{idle_pids}->{$pid} = $uri_intvl;
+       PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self);
+}
 
-       # tear it all down
-       kill('QUIT', $_) for (keys %$idle_pids);
-       while (scalar keys %$idle_pids) {
-               if (my $pid = waitpid(-1, WNOHANG)) {
-                       if ($pid < 0) {
-                               warn "E: no children? $! (PIDs: ",
-                                       join(', ', keys %$idle_pids),")\n";
-                               last;
-                       } else {
-                               delete $idle_pids->{$pid};
-                       }
-               } else { # signals aren't that reliable w/o signalfd/kevent
-                       sleep 1;
-                       kill('QUIT', $_) for (keys %$idle_pids);
+sub event_step {
+       my ($self) = @_;
+       return if $self->{quit};
+       my $idle_todo = $self->{idle_todo};
+       if ($idle_todo && @$idle_todo) {
+               $self->{mics} = {}; # going to be forking, so disconnect
+               while (my $uri_intvl = shift(@$idle_todo)) {
+                       imap_idle_fork($self, $uri_intvl);
                }
        }
+       goto(&fs_scan_step) if $self->{mdre};
 }
 
-sub watch_imap ($) {
+sub watch_imap_init ($) {
        my ($self) = @_;
-       my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ];
+       eval { require PublicInbox::IMAPClient } or
+               die "Mail::IMAPClient is required for IMAP:\n$@\n";
+       eval { require Git } or
+               die "Git (Perl module) is required for IMAP:\n$@\n";
+       eval { require PublicInbox::IMAPTracker } or
+               die "DBD::SQLite is required for IMAP\n:$@\n";
+
+       my $mic_args = imap_common_init($self); # read args from config
+
+       # make sure we can connect and cache the credentials in memory
+       $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
+       my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
+       for my $url (sort keys %{$self->{imap}}) {
+               my $uri = PublicInbox::URIimap->new($url);
+               $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
+       }
+
+       my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
        my $poll = {}; # intvl_seconds => [ uri1, uri2 ]
        for my $url (keys %{$self->{imap}}) {
                my $uri = PublicInbox::URIimap->new($url);
                my $sec = imap_section($uri);
-               my $mic = $self->{mics}->{$sec};
+               my $mic = $mics->{$sec};
                my $intvl = $self->{imap_opt}->{$sec}->{poll_intvl};
                if ($mic->has_capability('IDLE') && !$intvl) {
                        $intvl = $self->{imap_opt}->{$sec}->{idle_intvl};
@@ -557,9 +537,10 @@ sub watch_imap ($) {
                        push @{$poll->{$intvl || 120}}, $uri;
                }
        }
-       my $nr_poll = scalar keys %$poll;
-       if (scalar @$idle && !$nr_poll) { # multiple idlers, need fork
-               watch_imap_idle_all($self, $idle);
+       if (scalar @$idle) {
+               $self->{idle_pids} = {};
+               $self->{idle_todo} = $idle;
+               PublicInbox::DS::requeue($self); # ->event_step to fork
        }
        # TODO: polling
 }
@@ -568,21 +549,11 @@ sub watch {
        my ($self, $sig, $oldset) = @_;
        $self->{oldset} = $oldset;
        $self->{sig} = $sig;
-       if ($self->{mdre} && $self->{imap}) {
-               defined(my $pid = fork) or die "fork: $!";
-               if ($pid == 0) {
-                       watch_atfork_child($self);
-                       imap_start($self);
-                       goto &watch_imap;
-               }
-               $self->{-imap_pid} = $pid;
-       } elsif ($self->{imap}) {
-               # not a child process, but no signalfd, yet:
-               watch_atfork_child($self);
-               imap_start($self);
-               goto &watch_imap;
-       }
-       goto &watch_fs;
+       watch_imap_init($self) if $self->{imap};
+       watch_fs_init($self) if $self->{mdre};
+       PublicInbox::DS->SetPostLoopCallback(sub {});
+       PublicInbox::DS->EventLoop until $self->{quit};
+       _done_for_now($self);
 }
 
 sub trigger_scan {
@@ -591,8 +562,7 @@ sub trigger_scan {
        PublicInbox::DS::requeue($self);
 }
 
-# called directly, and by PublicInbox::DS
-sub event_step ($) {
+sub fs_scan_step {
        my ($self) = @_;
        return if $self->{quit};
        my $op = shift @{$self->{ops}};
@@ -634,7 +604,7 @@ sub event_step ($) {
 sub scan {
        my ($self, $op) = @_;
        push @{$self->{ops}}, $op;
-       goto &event_step;
+       goto &fs_scan_step;
 }
 
 sub _importer_for {
index b6d545adad782c81cecb92c49e3dbb7578575546..ae7b70be355f056a0201f51a66bb089f2562371d 100755 (executable)
@@ -22,7 +22,11 @@ if ($watch_md) {
                $watch_md->quit if $watch_md;
                $watch_md = undef;
        };
-       my $sig = { HUP => $reload, USR1 => $scan };
+       my $sig = {
+               HUP => $reload,
+               USR1 => $scan,
+               CHLD => \&PublicInbox::DS::enqueue_reap,
+       };
        $sig->{QUIT} = $sig->{TERM} = $sig->{INT} = $quit;
 
        # --no-scan is only intended for testing atm, undocumented.