-# Copyright (C) 2016-2020 all contributors <meta@public-inbox.org>
+# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
#
# ref: https://cr.yp.to/proto/maildir.html
use PublicInbox::DS qw(now);
use PublicInbox::MID qw(mids);
use PublicInbox::ContentHash qw(content_hash);
-use POSIX qw(_exit);
+use PublicInbox::EOFpipe;
+use POSIX qw(_exit WNOHANG);
sub compile_watchheaders ($) {
my ($ibx) = @_;
}
sub new {
- my ($class, $config) = @_;
+ my ($class, $cfg) = @_;
my (%mdmap, $spamc);
my (%imap, %nntp); # url => [inbox objects] or 'watchspam'
# indefinitely...
foreach my $pfx (qw(publicinboxwatch publicinboxlearn)) {
my $k = "$pfx.watchspam";
- defined(my $dirs = $config->{$k}) or next;
+ defined(my $dirs = $cfg->{$k}) or next;
$dirs = PublicInbox::Config::_array($dirs);
for my $dir (@$dirs) {
my $url;
my $k = 'publicinboxwatch.spamcheck';
my $default = undef;
- my $spamcheck = PublicInbox::Spamcheck::get($config, $k, $default);
+ my $spamcheck = PublicInbox::Spamcheck::get($cfg, $k, $default);
$spamcheck = _spamcheck_cb($spamcheck) if $spamcheck;
- $config->each_inbox(sub {
+ $cfg->each_inbox(sub {
# need to make all inboxes writable for spam removal:
my $ibx = $_[0] = PublicInbox::InboxWritable->new($_[0]);
spamcheck => $spamcheck,
mdmap => \%mdmap,
mdre => $mdre,
- config => $config,
+ pi_cfg => $cfg,
imap => scalar keys %imap ? \%imap : undef,
nntp => scalar keys %nntp? \%nntp : undef,
importers => {},
}
sub remove_eml_i { # each_inbox callback
- my ($ibx, $arg) = @_;
- my ($self, $eml, $loc) = @$arg;
+ my ($ibx, $self, $eml, $loc) = @_;
eval {
# try to avoid taking a lock or unnecessary spawning
$path =~ /:2,[A-R]*S[T-Za-z]*\z/ or return;
my $eml = eml_from_path($path) or return;
local $SIG{__WARN__} = warn_ignore_cb();
- $self->{config}->each_inbox(\&remove_eml_i, [ $self, $eml, $path ]);
+ $self->{pi_cfg}->each_inbox(\&remove_eml_i, $self, $eml, $path);
}
sub import_eml ($$$) {
warn "unmappable dir: $1\n";
return;
}
- my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
+ my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
local $SIG{__WARN__} = sub {
my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : '';
$warn_cb->($pfx, "path: $path\n", @_);
delete $self->{done_timer};
_done_for_now($self);
};
- my $cb = sub {
+ my $cb = sub { # called by PublicInbox::DirIdle::event_step
_try_path($self, $_[0]->fullname);
$self->{done_timer} //= PublicInbox::DS::requeue($done);
};
# flesh out common IMAP-specific data structures
sub imap_common_init ($) {
my ($self) = @_;
- my $cfg = $self->{config};
+ my $cfg = $self->{pi_cfg};
my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg
for my $url (sort keys %{$self->{imap}}) {
my $uri = PublicInbox::URIimap->new($url);
if (ref($inboxes)) {
for my $ibx (@$inboxes) {
my $eml = PublicInbox::Eml->new($$raw);
- my $x = import_eml($self, $ibx, $eml);
+ import_eml($self, $ibx, $eml);
}
} elsif ($inboxes eq 'watchspam') {
# we don't remove unseen messages
if ($flags =~ /\\Seen\b/) {
local $SIG{__WARN__} = warn_ignore_cb();
my $eml = PublicInbox::Eml->new($raw);
- my $arg = [ $self, $eml, "$url UID:$uid" ];
- $self->{config}->each_inbox(\&remove_eml_i, $arg);
+ $self->{pi_cfg}->each_inbox(\&remove_eml_i,
+ $self, $eml, "$url UID:$uid");
}
} else {
die "BUG: destination unknown $inboxes";
my $key = $req;
$key =~ s/\.PEEK//;
my ($uids, $batch);
- my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
+ my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
local $SIG{__WARN__} = sub {
my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : '';
$batch //= '?';
$err = imap_fetch_all($self, $mic, $url);
$err //= imap_idle_once($self, $mic, $intvl, $url);
} else {
- $err = "not connected: $!";
+ $err = "E: not connected: $!";
}
if ($err && !$self->{quit}) {
warn $err, "\n";
sub watch_atfork_parent ($) {
my ($self) = @_;
_done_for_now($self);
+ PublicInbox::Sigfd::block_signals();
}
sub imap_idle_requeue ($) { # DS::add_timer callback
\&imap_idle_requeue, [ $self, $url_intvl ]);
}
+sub reap { # callback for EOFpipe
+ my ($pid, $cb, $self) = @{$_[0]};
+ my $ret = waitpid($pid, 0);
+ if ($ret == $pid) {
+ $cb->($self, $pid); # poll_fetch_reap || imap_idle_reap
+ } else {
+ warn "W: waitpid($pid) => ", $ret // "($!)", "\n";
+ }
+}
+
sub imap_idle_fork ($$) {
my ($self, $url_intvl) = @_;
my ($url, $intvl) = @$url_intvl;
+ pipe(my ($r, $w)) or die "pipe: $!";
defined(my $pid = fork) or die "fork: $!";
if ($pid == 0) {
+ close $r;
watch_atfork_child($self);
watch_imap_idle_1($self, $url, $intvl);
+ close $w;
_exit(0);
}
$self->{idle_pids}->{$pid} = $url_intvl;
- PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self);
+ PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]);
}
sub event_step {
return if $self->{quit};
my $idle_todo = $self->{idle_todo};
if ($idle_todo && @$idle_todo) {
- watch_atfork_parent($self);
- while (my $url_intvl = shift(@$idle_todo)) {
- imap_idle_fork($self, $url_intvl);
- }
+ my $oldset = watch_atfork_parent($self);
+ eval {
+ while (my $url_intvl = shift(@$idle_todo)) {
+ imap_idle_fork($self, $url_intvl);
+ }
+ };
+ PublicInbox::Sigfd::sig_setmask($oldset);
+ die $@ if $@;
}
- goto(&fs_scan_step) if $self->{mdre};
+ fs_scan_step($self) if $self->{mdre};
}
sub watch_imap_fetch_all ($$) {
sub poll_fetch_fork ($) { # DS::add_timer callback
my ($self, $intvl, $urls) = @{$_[0]};
return if $self->{quit};
- watch_atfork_parent($self);
- defined(my $pid = fork) or die "fork: $!";
- if ($pid == 0) {
+ pipe(my ($r, $w)) or die "pipe: $!";
+ my $oldset = watch_atfork_parent($self);
+ my $pid = fork;
+ if (defined($pid) && $pid == 0) {
+ close $r;
watch_atfork_child($self);
if ($urls->[0] =~ m!\Aimaps?://!i) {
watch_imap_fetch_all($self, $urls);
} else {
watch_nntp_fetch_all($self, $urls);
}
+ close $w;
_exit(0);
}
+ PublicInbox::Sigfd::sig_setmask($oldset);
+ die "fork: $!" unless defined $pid;
$self->{poll_pids}->{$pid} = [ $intvl, $urls ];
- PublicInbox::DS::dwaitpid($pid, \&poll_fetch_reap, $self);
+ PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
}
-sub poll_fetch_reap { # PublicInbox::DS::dwaitpid callback
+sub poll_fetch_reap {
my ($self, $pid) = @_;
my $intvl_urls = delete $self->{poll_pids}->{$pid} or
die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
# flesh out common NNTP-specific data structures
sub nntp_common_init ($) {
my ($self) = @_;
- my $cfg = $self->{config};
+ my $cfg = $self->{pi_cfg};
my $nn_args = {}; # scheme://authority => Net::NNTP->new arg
for my $url (sort keys %{$self->{nntp}}) {
my $sec = uri_section(uri_new($url));
$beg = $l_art + 1;
warn "I: $url fetching ARTICLE $beg..$end\n";
- my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
+ my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
my ($err, $art);
local $SIG{__WARN__} = sub {
my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : '';
}
} elsif ($inboxes eq 'watchspam') {
my $eml = PublicInbox::Eml->new(\$raw);
- my $arg = [ $self, $eml, "$url ARTICLE $art" ];
- $self->{config}->each_inbox(\&remove_eml_i, $arg);
+ $self->{pi_cfg}->each_inbox(\&remove_eml_i,
+ $self, $eml, "$url ARTICLE $art");
} else {
die "BUG: destination unknown $inboxes";
}
}
}
-sub watch {
+sub watch { # main entry point
my ($self, $sig, $oldset) = @_;
$self->{oldset} = $oldset;
$self->{sig} = $sig;
}
watch_fs_init($self) if $self->{mdre};
PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
- PublicInbox::DS->EventLoop;
+ PublicInbox::DS->EventLoop; # calls ->event_step
_done_for_now($self);
}
sub scan {
my ($self, $op) = @_;
push @{$self->{ops}}, $op;
- goto &fs_scan_step;
+ fs_scan_step($self);
}
sub _importer_for {
sub _spamcheck_cb {
my ($sc) = @_;
- sub {
+ sub { # this gets called by (V2Writable||Import)->add
my ($mime, $ibx) = @_;
return if content_exists($ibx, $mime);
my $tmp = '';