sub _done_for_now {
my ($self) = @_;
- my $importers = $self->{importers};
- foreach my $im (values %$importers) {
- $im->done;
+ local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
+ for (values %{$self->{importers}}) {
+ $_->done if $_; # $_ may be undef during cleanup
}
}
warn "unmappable dir: $1\n";
return;
}
- if (!ref($inboxes) && $inboxes eq 'watchspam') {
- return _remove_spam($self, $path);
- }
-
my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
local $SIG{__WARN__} = sub {
$warn_cb->("path: $path\n");
$warn_cb->(@_);
};
+ if (!ref($inboxes) && $inboxes eq 'watchspam') {
+ return _remove_spam($self, $path);
+ }
foreach my $ibx (@$inboxes) {
my $eml = mime_from_path($path) or next;
import_eml($self, $ibx, $eml);
}
}
+sub quit_done ($) {
+ my ($self) = @_;
+ return unless $self->{quit};
+
+ # don't have reliable wakeups, keep signalling
+ my $done = 1;
+ for (qw(idle_pids poll_pids)) {
+ my $pids = $self->{$_} or next;
+ for (keys %$pids) {
+ $done = undef if kill('QUIT', $_);
+ }
+ }
+ $done;
+}
+
sub quit {
my ($self) = @_;
$self->{quit} = 1;
%{$self->{opendirs}} = ();
_done_for_now($self);
- if (my $imap_pid = $self->{-imap_pid}) {
- kill('QUIT', $imap_pid);
- }
- for (qw(idle_pids poll_pids)) {
- my $pids = $self->{$_} or next;
- kill('QUIT', $_) for (keys %$pids);
- }
+ quit_done($self);
if (my $idle_mic = $self->{idle_mic}) {
eval { $idle_mic->done };
if ($@) {
}
$self->{idle_mic} = $mic; # for ->quit
my @res;
- until ($self->{quit} || grep(/^\* [0-9]+ EXISTS/, @res) || $i <= 0) {
+ until ($self->{quit} || !$mic->IsConnected ||
+ grep(/^\* [0-9]+ EXISTS/, @res) || $i <= 0) {
@res = $mic->idle_data($i);
$i = $end - now();
}
local $0 = $uri->mailbox." $sec";
until ($self->{quit}) {
$mic //= PublicInbox::IMAPClient->new(%$mic_arg);
- my $err = imap_fetch_all($self, $mic, $url);
- $err //= imap_idle_once($self, $mic, $intvl, $url);
+ my $err;
+ if ($mic && $mic->IsConnected) {
+ $err = imap_fetch_all($self, $mic, $url);
+ $err //= imap_idle_once($self, $mic, $intvl, $url);
+ } else {
+ $err = "not connected: $!";
+ }
if ($err && !$self->{quit}) {
warn $err, "\n";
$mic = undef;
delete $self->{poll_pids};
delete $self->{opendirs};
PublicInbox::DS->Reset;
+ %SIG = (%SIG, %{$self->{sig}}, CHLD => 'DEFAULT');
PublicInbox::Sigfd::sig_setmask($self->{oldset});
- %SIG = (%SIG, %{$self->{sig}});
}
sub watch_atfork_parent ($) {
_done_for_now($self);
}
+sub imap_idle_requeue ($) { # DS::add_timer callback
+ my ($self, $url_intvl) = @{$_[0]};
+ return if $self->{quit};
+ push @{$self->{idle_todo}}, $url_intvl;
+ event_step($self);
+}
+
sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
my ($self, $pid) = @_;
my $url_intvl = delete $self->{idle_pids}->{$pid} or
my ($url, $intvl) = @$url_intvl;
return if $self->{quit};
warn "W: PID=$pid on $url died: \$?=$?\n" if $?;
- push @{$self->{idle_todo}}, $url_intvl;
- PubicInbox::DS::requeue($self); # call ->event_step to respawn
+ PublicInbox::DS::add_timer(60,
+ \&imap_idle_requeue, [ $self, $url_intvl ]);
}
sub imap_idle_fork ($$) {
[$self, $intvl, $urls]);
}
watch_fs_init($self) if $self->{mdre};
- PublicInbox::DS->SetPostLoopCallback(sub {});
- PublicInbox::DS->EventLoop until $self->{quit};
+ PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
+ PublicInbox::DS->EventLoop;
_done_for_now($self);
}
my ($self) = @_;
return if $self->{quit};
my $op = shift @{$self->{ops}};
+ local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
# continue existing scan
my $max = 10;