]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/WatchMaildir.pm
watch: wire up IMAP IDLE reapers to DS
[public-inbox.git] / lib / PublicInbox / WatchMaildir.pm
index fea7d5ef9ee54cd13d6a906dbb4339afaafdb511..431350be277f881f9a2c5e6a22f4c10ce88c6bb2 100644 (file)
@@ -8,11 +8,11 @@ use strict;
 use warnings;
 use PublicInbox::Eml;
 use PublicInbox::InboxWritable;
-use File::Temp 0.19 (); # 0.19 for ->newdir
 use PublicInbox::Filter::Base qw(REJECT);
 use PublicInbox::Spamcheck;
+use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now);
-use POSIX qw(_exit WNOHANG);
+use POSIX qw(_exit);
 *mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
 
 sub compile_watchheaders ($) {
@@ -108,6 +108,7 @@ sub new {
                imap => scalar keys %imap ? \%imap : undef,
                importers => {},
                opendirs => {}, # dirname => dirhandle (in progress scans)
+               ops => [], # 'quit', 'full'
        }, $class;
 }
 
@@ -119,19 +120,6 @@ sub _done_for_now {
        }
 }
 
-sub _try_fsn_paths {
-       my ($self, $scan_re, $paths) = @_;
-       foreach (@$paths) {
-               my $path = $_->{path};
-               if ($path =~ $scan_re) {
-                       scan($self, $path);
-               } else {
-                       _try_path($self, $path);
-               }
-       }
-       _done_for_now($self);
-}
-
 sub remove_eml_i { # each_inbox callback
        my ($ibx, $arg) = @_;
        my ($self, $eml, $loc) = @$arg;
@@ -208,7 +196,9 @@ sub _try_path {
 
 sub quit {
        my ($self) = @_;
-       trigger_scan($self, 'quit') or $self->{quit} = 1;
+       $self->{quit} = 1;
+       %{$self->{opendirs}} = ();
+       _done_for_now($self);
        if (my $imap_pid = $self->{-imap_pid}) {
                kill('QUIT', $imap_pid);
        }
@@ -223,18 +213,18 @@ sub quit {
        }
 }
 
-sub watch_fs {
+sub watch_fs_init ($) {
        my ($self) = @_;
-       my $scan = File::Temp->newdir("public-inbox-watch.$$.scan.XXXXXX",
-                                       TMPDIR => 1);
-       my $scandir = $self->{scandir} = $scan->dirname;
-       my $re = qr!\A$scandir/!;
-       my $cb = sub { _try_fsn_paths($self, $re, \@_) };
-
-       eval { require Filesys::Notify::Simple } or
-               die "Filesys::Notify::Simple is currently required for $0\n";
-       my $fsn = Filesys::Notify::Simple->new([@{$self->{mdir}}, $scandir]);
-       $fsn->wait($cb) until $self->{quit};
+       my $done = sub {
+               delete $self->{done_timer};
+               _done_for_now($self);
+       };
+       my $cb = sub {
+               _try_path($self, $_[0]->fullname);
+               $self->{done_timer} //= PublicInbox::DS::requeue($done);
+       };
+       require PublicInbox::DirIdle;
+       PublicInbox::DirIdle->new($self->{mdir}, $cb); # EPOLL_CTL_ADD
 }
 
 # returns the git config section name, e.g [imap "imaps://user@example.com"]
@@ -341,25 +331,6 @@ sub mic_for ($$$) { # mic = Mail::IMAPClient
        $mic;
 }
 
-sub imap_start ($) {
-       my ($self) = @_;
-       eval { require PublicInbox::IMAPClient } or
-               die "Mail::IMAPClient is required for IMAP:\n$@\n";
-       eval { require Git } or
-               die "Git (Perl module) is required for IMAP:\n$@\n";
-       eval { require PublicInbox::IMAPTracker } or
-               die "DBD::SQLite is required for IMAP\n:$@\n";
-
-       my $mic_args = imap_common_init($self);
-       # make sure we can connect and cache the credentials in memory
-       $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
-       my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
-       for my $url (sort keys %{$self->{imap}}) {
-               my $uri = PublicInbox::URIimap->new($url);
-               $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
-       }
-}
-
 sub imap_fetch_all ($$$) {
        my ($self, $mic, $uri) = @_;
        my $sec = imap_section($uri);
@@ -486,69 +457,78 @@ sub watch_imap_idle_1 ($$$) {
        }
 }
 
-sub watch_imap_idle_all ($$) {
-       my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]]
-       $self->{mics} = {}; # going to be forking, so disconnect
-       my $idle_pids = $self->{idle_pids} = {};
-       until ($self->{quit}) {
-               while (my $uri_intvl = shift @$idle) {
-                       my ($uri, $intvl) = @$uri_intvl;
-                       defined(my $pid = fork) or die "fork: $!";
-                       if ($pid == 0) {
-                               delete $self->{idle_pids};
-                               watch_imap_idle_1($self, $uri, $intvl);
-                               _exit(0);
-                       }
-                       $idle_pids->{$pid} = $uri_intvl;
-               }
-               my $pid = waitpid(-1, 0) or next;
-               if ($pid < 0) {
-                       warn "W: no idling children: $!";
-                       if (@$idle) {
-                               sleep 60;
-                       } else {
-                               warn "W: nothing to respawn, quitting IDLE\n";
-                               last;
-                       }
-               }
-               if (my $uri_intvl = delete $idle_pids->{$pid}) {
-                       my ($uri, $intvl) = @$uri_intvl;
-                       my $url = $uri->as_string;
-                       if ($? || !$self->{quit}) {
-                               warn "W: PID=$pid on $url died: \$?=$?\n";
-                       }
-                       push @$idle, $uri_intvl;
-               } else {
-                       warn "W: PID=$pid (unknown) reaped: \$?=$?\n";
-               }
-       }
+sub watch_atfork_child ($) {
+       my ($self) = @_;
+       delete $self->{idle_pids};
+       PublicInbox::DS->Reset;
+       PublicInbox::Sigfd::sig_setmask($self->{oldset});
+       %SIG = (%SIG, %{$self->{sig}});
+}
 
-       # tear it all down
-       kill('QUIT', $_) for (keys %$idle_pids);
-       while (scalar keys %$idle_pids) {
-               if (my $pid = waitpid(-1, WNOHANG)) {
-                       if ($pid < 0) {
-                               warn "E: no children? $! (PIDs: ",
-                                       join(', ', keys %$idle_pids),")\n";
-                               last;
-                       } else {
-                               delete $idle_pids->{$pid};
-                       }
-               } else { # signals aren't that reliable w/o signalfd/kevent
-                       sleep 1;
-                       kill('QUIT', $_) for (keys %$idle_pids);
+sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
+       my ($self, $pid) = @_;
+       my $uri_intvl = delete $self->{idle_pids}->{$pid} or
+               die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+
+       my ($uri, $intvl) = @$uri_intvl;
+       my $url = $uri->as_string;
+       return if $self->{quit};
+       warn "W: PID=$pid on $url died: \$?=$?\n" if $?;
+       push @{$self->{idle_todo}}, $uri_intvl;
+       PubicInbox::DS::requeue($self); # call ->event_step to respawn
+}
+
+sub imap_idle_fork ($$) {
+       my ($self, $uri_intvl) = @_;
+       my ($uri, $intvl) = @$uri_intvl;
+       defined(my $pid = fork) or die "fork: $!";
+       if ($pid == 0) {
+               watch_atfork_child($self);
+               watch_imap_idle_1($self, $uri, $intvl);
+               _exit(0);
+       }
+       $self->{idle_pids}->{$pid} = $uri_intvl;
+       PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self);
+}
+
+sub event_step {
+       my ($self) = @_;
+       return if $self->{quit};
+       my $idle_todo = $self->{idle_todo};
+       if ($idle_todo && @$idle_todo) {
+               $self->{mics} = {}; # going to be forking, so disconnect
+               while (my $uri_intvl = shift(@$idle_todo)) {
+                       imap_idle_fork($self, $uri_intvl);
                }
        }
+       goto(&fs_scan_step) if $self->{mdre};
 }
 
-sub watch_imap ($) {
+sub watch_imap_init ($) {
        my ($self) = @_;
-       my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ];
+       eval { require PublicInbox::IMAPClient } or
+               die "Mail::IMAPClient is required for IMAP:\n$@\n";
+       eval { require Git } or
+               die "Git (Perl module) is required for IMAP:\n$@\n";
+       eval { require PublicInbox::IMAPTracker } or
+               die "DBD::SQLite is required for IMAP\n:$@\n";
+
+       my $mic_args = imap_common_init($self); # read args from config
+
+       # make sure we can connect and cache the credentials in memory
+       $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
+       my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
+       for my $url (sort keys %{$self->{imap}}) {
+               my $uri = PublicInbox::URIimap->new($url);
+               $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
+       }
+
+       my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
        my $poll = {}; # intvl_seconds => [ uri1, uri2 ]
        for my $url (keys %{$self->{imap}}) {
                my $uri = PublicInbox::URIimap->new($url);
                my $sec = imap_section($uri);
-               my $mic = $self->{mics}->{$sec};
+               my $mic = $mics->{$sec};
                my $intvl = $self->{imap_opt}->{$sec}->{poll_intvl};
                if ($mic->has_capability('IDLE') && !$intvl) {
                        $intvl = $self->{imap_opt}->{$sec}->{idle_intvl};
@@ -557,47 +537,37 @@ sub watch_imap ($) {
                        push @{$poll->{$intvl || 120}}, $uri;
                }
        }
-       my $nr_poll = scalar keys %$poll;
-       if (scalar @$idle && !$nr_poll) { # multiple idlers, need fork
-               watch_imap_idle_all($self, $idle);
+       if (scalar @$idle) {
+               $self->{idle_pids} = {};
+               $self->{idle_todo} = $idle;
+               PublicInbox::DS::requeue($self); # ->event_step to fork
        }
        # TODO: polling
 }
 
 sub watch {
-       my ($self) = @_;
-       if ($self->{mdre} && $self->{imap}) {
-               defined(my $pid = fork) or die "fork: $!";
-               if ($pid == 0) {
-                       imap_start($self);
-                       goto &watch_imap;
-               }
-               $self->{-imap_pid} = $pid;
-       } elsif ($self->{imap}) {
-               imap_start($self);
-               goto &watch_imap;
-       }
-       goto &watch_fs;
+       my ($self, $sig, $oldset) = @_;
+       $self->{oldset} = $oldset;
+       $self->{sig} = $sig;
+       watch_imap_init($self) if $self->{imap};
+       watch_fs_init($self) if $self->{mdre};
+       PublicInbox::DS->SetPostLoopCallback(sub {});
+       PublicInbox::DS->EventLoop until $self->{quit};
+       _done_for_now($self);
 }
 
 sub trigger_scan {
-       my ($self, $base) = @_;
-       my $dir = $self->{scandir} or return;
-       open my $fh, '>', "$dir/$base" or die "open $dir/$base failed: $!\n";
-       close $fh or die "close $dir/$base failed: $!\n";
+       my ($self, $op) = @_;
+       push @{$self->{ops}}, $op;
+       PublicInbox::DS::requeue($self);
 }
 
-sub scan {
-       my ($self, $path) = @_;
-       if ($path =~ /quit\z/) {
-               %{$self->{opendirs}} = ();
-               _done_for_now($self);
-               delete $self->{scandir};
-               $self->{quit} = 1;
-               return;
-       }
-       # else: $path =~ /(cont|full)\z/
+sub fs_scan_step {
+       my ($self) = @_;
        return if $self->{quit};
+       my $op = shift @{$self->{ops}};
+
+       # continue existing scan
        my $max = 10;
        my $opendirs = $self->{opendirs};
        my @dirnames = keys %$opendirs;
@@ -610,7 +580,7 @@ sub scan {
                }
                $opendirs->{$dir} = $dh if $n < 0;
        }
-       if ($path =~ /full\z/) {
+       if ($op && $op eq 'full') {
                foreach my $dir (@{$self->{mdir}}) {
                        next if $opendirs->{$dir}; # already in progress
                        my $ok = opendir(my $dh, $dir);
@@ -628,7 +598,13 @@ sub scan {
        }
        _done_for_now($self);
        # do we have more work to do?
-       trigger_scan($self, 'cont') if keys %$opendirs;
+       PublicInbox::DS::requeue($self) if keys %$opendirs;
+}
+
+sub scan {
+       my ($self, $op) = @_;
+       push @{$self->{ops}}, $op;
+       goto &fs_scan_step;
 }
 
 sub _importer_for {