use strict;
use warnings;
use PublicInbox::Eml;
-use PublicInbox::InboxWritable;
+use PublicInbox::InboxWritable qw(eml_from_path);
use PublicInbox::Filter::Base qw(REJECT);
use PublicInbox::Spamcheck;
use PublicInbox::Sigfd;
use PublicInbox::MID qw(mids);
use PublicInbox::ContentHash qw(content_hash);
use POSIX qw(_exit);
-*mime_from_path = \&PublicInbox::InboxWritable::mime_from_path;
sub compile_watchheaders ($) {
my ($ibx) = @_;
sub _done_for_now {
my ($self) = @_;
- my $importers = $self->{importers};
- foreach my $im (values %$importers) {
- $im->done;
+ local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
+ for my $im (values %{$self->{importers}}) {
+ next if !$im; # $im may be undef during cleanup
+ eval { $im->done };
+ warn "$im->{ibx}->{name} ->done: $@\n" if $@;
}
}
$im->remove($eml, 'spam');
if (my $scrub = $ibx->filter($im)) {
my $scrubbed = $scrub->scrub($eml, 1);
- $scrubbed or return;
- $scrubbed == REJECT() and return;
- $im->remove($scrubbed, 'spam');
+ if ($scrubbed && $scrubbed != REJECT) {
+ $im->remove($scrubbed, 'spam');
+ }
}
};
- warn "error removing spam at: $loc from $ibx->{name}: $@\n" if $@;
+ if ($@) {
+ warn "error removing spam at: $loc from $ibx->{name}: $@\n";
+ _done_for_now($self);
+ }
}
sub _remove_spam {
my ($self, $path) = @_;
# path must be marked as (S)een
$path =~ /:2,[A-R]*S[T-Za-z]*\z/ or return;
- my $eml = mime_from_path($path) or return;
+ my $eml = eml_from_path($path) or return;
$self->{config}->each_inbox(\&remove_eml_i, [ $self, $eml, $path ]);
}
sub import_eml ($$$) {
my ($self, $ibx, $eml) = @_;
- my $im = _importer_for($self, $ibx);
# any header match means it's eligible for the inbox:
if (my $watch_hdrs = $ibx->{-watchheaders}) {
}
return unless $ok;
}
-
- if (my $scrub = $ibx->filter($im)) {
- my $ret = $scrub->scrub($eml) or return;
- $ret == REJECT() and return;
- $eml = $ret;
+ eval {
+ my $im = _importer_for($self, $ibx);
+ if (my $scrub = $ibx->filter($im)) {
+ my $scrubbed = $scrub->scrub($eml) or return;
+ $scrubbed == REJECT and return;
+ $eml = $scrubbed;
+ }
+ $im->add($eml, $self->{spamcheck});
+ };
+ if ($@) {
+ warn "$ibx->{name} add failed: $@\n";
+ _done_for_now($self);
}
- $im->add($eml, $self->{spamcheck});
}
sub _try_path {
return _remove_spam($self, $path);
}
foreach my $ibx (@$inboxes) {
- my $eml = mime_from_path($path) or next;
+ my $eml = eml_from_path($path) or next;
import_eml($self, $ibx, $eml);
}
}
+sub quit_done ($) {
+ my ($self) = @_;
+ return unless $self->{quit};
+
+ # don't have reliable wakeups, keep signalling
+ my $done = 1;
+ for (qw(idle_pids poll_pids)) {
+ my $pids = $self->{$_} or next;
+ for (keys %$pids) {
+ $done = undef if kill('QUIT', $_);
+ }
+ }
+ $done;
+}
+
sub quit {
my ($self) = @_;
$self->{quit} = 1;
%{$self->{opendirs}} = ();
_done_for_now($self);
- if (my $imap_pid = $self->{-imap_pid}) {
- kill('QUIT', $imap_pid);
- }
- for (qw(idle_pids poll_pids)) {
- my $pids = $self->{$_} or next;
- kill('QUIT', $_) for (keys %$pids);
- }
+ quit_done($self);
if (my $idle_mic = $self->{idle_mic}) {
eval { $idle_mic->done };
if ($@) {
}
$self->{idle_mic} = $mic; # for ->quit
my @res;
- until ($self->{quit} || grep(/^\* [0-9]+ EXISTS/, @res) || $i <= 0) {
+ until ($self->{quit} || !$mic->IsConnected ||
+ grep(/^\* [0-9]+ EXISTS/, @res) || $i <= 0) {
@res = $mic->idle_data($i);
$i = $end - now();
}
local $0 = $uri->mailbox." $sec";
until ($self->{quit}) {
$mic //= PublicInbox::IMAPClient->new(%$mic_arg);
- my $err = imap_fetch_all($self, $mic, $url);
- $err //= imap_idle_once($self, $mic, $intvl, $url);
+ my $err;
+ if ($mic && $mic->IsConnected) {
+ $err = imap_fetch_all($self, $mic, $url);
+ $err //= imap_idle_once($self, $mic, $intvl, $url);
+ } else {
+ $err = "not connected: $!";
+ }
if ($err && !$self->{quit}) {
warn $err, "\n";
$mic = undef;
_done_for_now($self);
}
+sub imap_idle_requeue ($) { # DS::add_timer callback
+ my ($self, $url_intvl) = @{$_[0]};
+ return if $self->{quit};
+ push @{$self->{idle_todo}}, $url_intvl;
+ event_step($self);
+}
+
sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
my ($self, $pid) = @_;
my $url_intvl = delete $self->{idle_pids}->{$pid} or
my ($url, $intvl) = @$url_intvl;
return if $self->{quit};
warn "W: PID=$pid on $url died: \$?=$?\n" if $?;
- push @{$self->{idle_todo}}, $url_intvl;
- PubicInbox::DS::requeue($self); # call ->event_step to respawn
+ PublicInbox::DS::add_timer(60,
+ \&imap_idle_requeue, [ $self, $url_intvl ]);
}
sub imap_idle_fork ($$) {
[$self, $intvl, $urls]);
}
watch_fs_init($self) if $self->{mdre};
- PublicInbox::DS->SetPostLoopCallback(sub {});
- PublicInbox::DS->EventLoop until $self->{quit};
+ PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
+ PublicInbox::DS->EventLoop;
_done_for_now($self);
}
my ($self) = @_;
return if $self->{quit};
my $op = shift @{$self->{ops}};
+ local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
# continue existing scan
my $max = 10;