use warnings;
use PublicInbox::Eml;
use PublicInbox::InboxWritable;
-use File::Temp 0.19 (); # 0.19 for ->newdir
use PublicInbox::Filter::Base qw(REJECT);
use PublicInbox::Spamcheck;
+use PublicInbox::Sigfd;
use PublicInbox::DS qw(now);
-use POSIX qw(_exit WNOHANG);
+use POSIX qw(_exit);
*mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
sub compile_watchheaders ($) {
imap => scalar keys %imap ? \%imap : undef,
importers => {},
opendirs => {}, # dirname => dirhandle (in progress scans)
+ ops => [], # 'quit', 'full'
}, $class;
}
sub quit {
my ($self) = @_;
- trigger_scan($self, 'quit') or $self->{quit} = 1;
+ $self->{quit} = 1;
+ %{$self->{opendirs}} = ();
+ _done_for_now($self);
if (my $imap_pid = $self->{-imap_pid}) {
kill('QUIT', $imap_pid);
}
}
}
-sub watch_fs {
+sub watch_fs_init ($) {
my ($self) = @_;
- require PublicInbox::DirIdle;
- my $scan = File::Temp->newdir("public-inbox-watch.$$.scan.XXXXXX",
- TMPDIR => 1);
- my $scandir = $self->{scandir} = $scan->dirname;
- my $scan_re = qr!\A$scandir/!;
my $done = sub {
delete $self->{done_timer};
_done_for_now($self);
};
my $cb = sub {
- my $path = $_[0]->fullname;
- if ($path =~ $scan_re) {
- scan($self, $path);
- } else {
- _try_path($self, $path);
- }
+ _try_path($self, $_[0]->fullname);
$self->{done_timer} //= PublicInbox::DS::requeue($done);
};
- my $di = PublicInbox::DirIdle->new([@{$self->{mdir}}, $scandir], $cb);
- PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} });
- PublicInbox::DS->EventLoop;
- _done_for_now($self);
+ require PublicInbox::DirIdle;
+ PublicInbox::DirIdle->new($self->{mdir}, $cb); # EPOLL_CTL_ADD
}
# returns the git config section name, e.g [imap "imaps://user@example.com"]
$mic;
}
-sub imap_start ($) {
- my ($self) = @_;
- eval { require PublicInbox::IMAPClient } or
- die "Mail::IMAPClient is required for IMAP:\n$@\n";
- eval { require Git } or
- die "Git (Perl module) is required for IMAP:\n$@\n";
- eval { require PublicInbox::IMAPTracker } or
- die "DBD::SQLite is required for IMAP\n:$@\n";
-
- my $mic_args = imap_common_init($self);
- # make sure we can connect and cache the credentials in memory
- $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
- my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
- for my $url (sort keys %{$self->{imap}}) {
- my $uri = PublicInbox::URIimap->new($url);
- $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
- }
-}
-
sub imap_fetch_all ($$$) {
my ($self, $mic, $uri) = @_;
my $sec = imap_section($uri);
}
}
-sub watch_imap_idle_all ($$) {
- my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]]
- $self->{mics} = {}; # going to be forking, so disconnect
- my $idle_pids = $self->{idle_pids} = {};
- until ($self->{quit}) {
- while (my $uri_intvl = shift @$idle) {
- my ($uri, $intvl) = @$uri_intvl;
- defined(my $pid = fork) or die "fork: $!";
- if ($pid == 0) {
- delete $self->{idle_pids};
- watch_imap_idle_1($self, $uri, $intvl);
- _exit(0);
- }
- $idle_pids->{$pid} = $uri_intvl;
- }
- my $pid = waitpid(-1, 0) or next;
- if ($pid < 0) {
- warn "W: no idling children: $!";
- if (@$idle) {
- sleep 60;
- } else {
- warn "W: nothing to respawn, quitting IDLE\n";
- last;
- }
- }
- if (my $uri_intvl = delete $idle_pids->{$pid}) {
- my ($uri, $intvl) = @$uri_intvl;
- my $url = $uri->as_string;
- if ($? || !$self->{quit}) {
- warn "W: PID=$pid on $url died: \$?=$?\n";
- }
- push @$idle, $uri_intvl;
- } else {
- warn "W: PID=$pid (unknown) reaped: \$?=$?\n";
- }
- }
+sub watch_atfork_child ($) {
+ my ($self) = @_;
+ delete $self->{idle_pids};
+ PublicInbox::DS->Reset;
+ PublicInbox::Sigfd::sig_setmask($self->{oldset});
+ %SIG = (%SIG, %{$self->{sig}});
+}
- # tear it all down
- kill('QUIT', $_) for (keys %$idle_pids);
- while (scalar keys %$idle_pids) {
- if (my $pid = waitpid(-1, WNOHANG)) {
- if ($pid < 0) {
- warn "E: no children? $! (PIDs: ",
- join(', ', keys %$idle_pids),")\n";
- last;
- } else {
- delete $idle_pids->{$pid};
- }
- } else { # signals aren't that reliable w/o signalfd/kevent
- sleep 1;
- kill('QUIT', $_) for (keys %$idle_pids);
+sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
+ my ($self, $pid) = @_;
+ my $uri_intvl = delete $self->{idle_pids}->{$pid} or
+ die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+
+ my ($uri, $intvl) = @$uri_intvl;
+ my $url = $uri->as_string;
+ return if $self->{quit};
+ warn "W: PID=$pid on $url died: \$?=$?\n" if $?;
+ push @{$self->{idle_todo}}, $uri_intvl;
+ PubicInbox::DS::requeue($self); # call ->event_step to respawn
+}
+
+sub imap_idle_fork ($$) {
+ my ($self, $uri_intvl) = @_;
+ my ($uri, $intvl) = @$uri_intvl;
+ defined(my $pid = fork) or die "fork: $!";
+ if ($pid == 0) {
+ watch_atfork_child($self);
+ watch_imap_idle_1($self, $uri, $intvl);
+ _exit(0);
+ }
+ $self->{idle_pids}->{$pid} = $uri_intvl;
+ PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self);
+}
+
+sub event_step {
+ my ($self) = @_;
+ return if $self->{quit};
+ my $idle_todo = $self->{idle_todo};
+ if ($idle_todo && @$idle_todo) {
+ $self->{mics} = {}; # going to be forking, so disconnect
+ while (my $uri_intvl = shift(@$idle_todo)) {
+ imap_idle_fork($self, $uri_intvl);
}
}
+ goto(&fs_scan_step) if $self->{mdre};
}
-sub watch_imap ($) {
+sub watch_imap_init ($) {
my ($self) = @_;
- my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ];
+ eval { require PublicInbox::IMAPClient } or
+ die "Mail::IMAPClient is required for IMAP:\n$@\n";
+ eval { require Git } or
+ die "Git (Perl module) is required for IMAP:\n$@\n";
+ eval { require PublicInbox::IMAPTracker } or
+ die "DBD::SQLite is required for IMAP\n:$@\n";
+
+ my $mic_args = imap_common_init($self); # read args from config
+
+ # make sure we can connect and cache the credentials in memory
+ $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args
+ my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj
+ for my $url (sort keys %{$self->{imap}}) {
+ my $uri = PublicInbox::URIimap->new($url);
+ $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args);
+ }
+
+ my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
my $poll = {}; # intvl_seconds => [ uri1, uri2 ]
for my $url (keys %{$self->{imap}}) {
my $uri = PublicInbox::URIimap->new($url);
my $sec = imap_section($uri);
- my $mic = $self->{mics}->{$sec};
+ my $mic = $mics->{$sec};
my $intvl = $self->{imap_opt}->{$sec}->{poll_intvl};
if ($mic->has_capability('IDLE') && !$intvl) {
$intvl = $self->{imap_opt}->{$sec}->{idle_intvl};
push @{$poll->{$intvl || 120}}, $uri;
}
}
- my $nr_poll = scalar keys %$poll;
- if (scalar @$idle && !$nr_poll) { # multiple idlers, need fork
- watch_imap_idle_all($self, $idle);
+ if (scalar @$idle) {
+ $self->{idle_pids} = {};
+ $self->{idle_todo} = $idle;
+ PublicInbox::DS::requeue($self); # ->event_step to fork
}
# TODO: polling
}
sub watch {
- my ($self) = @_;
- if ($self->{mdre} && $self->{imap}) {
- defined(my $pid = fork) or die "fork: $!";
- if ($pid == 0) {
- imap_start($self);
- goto &watch_imap;
- }
- $self->{-imap_pid} = $pid;
- } elsif ($self->{imap}) {
- imap_start($self);
- goto &watch_imap;
- }
- goto &watch_fs;
+ my ($self, $sig, $oldset) = @_;
+ $self->{oldset} = $oldset;
+ $self->{sig} = $sig;
+ watch_imap_init($self) if $self->{imap};
+ watch_fs_init($self) if $self->{mdre};
+ PublicInbox::DS->SetPostLoopCallback(sub {});
+ PublicInbox::DS->EventLoop until $self->{quit};
+ _done_for_now($self);
}
sub trigger_scan {
- my ($self, $base) = @_;
- my $dir = $self->{scandir} or return;
- open my $fh, '>', "$dir/$base" or die "open $dir/$base failed: $!\n";
- close $fh or die "close $dir/$base failed: $!\n";
+ my ($self, $op) = @_;
+ push @{$self->{ops}}, $op;
+ PublicInbox::DS::requeue($self);
}
-sub scan {
- my ($self, $path) = @_;
- if ($path =~ /quit\z/) {
- %{$self->{opendirs}} = ();
- _done_for_now($self);
- delete $self->{scandir};
- $self->{quit} = 1;
- return;
- }
- # else: $path =~ /(cont|full)\z/
+sub fs_scan_step {
+ my ($self) = @_;
return if $self->{quit};
+ my $op = shift @{$self->{ops}};
+
+ # continue existing scan
my $max = 10;
my $opendirs = $self->{opendirs};
my @dirnames = keys %$opendirs;
}
$opendirs->{$dir} = $dh if $n < 0;
}
- if ($path =~ /full\z/) {
+ if ($op && $op eq 'full') {
foreach my $dir (@{$self->{mdir}}) {
next if $opendirs->{$dir}; # already in progress
my $ok = opendir(my $dh, $dir);
}
_done_for_now($self);
# do we have more work to do?
- trigger_scan($self, 'cont') if keys %$opendirs;
+ PublicInbox::DS::requeue($self) if keys %$opendirs;
+}
+
+sub scan {
+ my ($self, $op) = @_;
+ push @{$self->{ops}}, $op;
+ goto &fs_scan_step;
}
sub _importer_for {