-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
#
# ref: https://cr.yp.to/proto/maildir.html
use PublicInbox::NetReader;
use PublicInbox::Filter::Base qw(REJECT);
use PublicInbox::Spamcheck;
-use PublicInbox::Sigfd;
-use PublicInbox::DS qw(now add_timer);
+use PublicInbox::DS qw(now add_timer awaitpid);
use PublicInbox::MID qw(mids);
use PublicInbox::ContentHash qw(content_hash);
-use PublicInbox::EOFpipe;
use POSIX qw(_exit WNOHANG);
sub compile_watchheaders ($) {
return unless $self->{quit};
# don't have reliable wakeups, keep signalling
- my $done = 1;
- for (qw(idle_pids poll_pids)) {
- my $pids = $self->{$_} or next;
- for (keys %$pids) {
- $done = undef if kill('QUIT', $_);
- }
- }
- $done;
+ my $live = grep { kill('QUIT', $_) } keys %{$self->{pids}};
+ add_timer(0.01, \&quit_done, $self) if $live;
+ $live == 0;
}
sub quit {
};
require PublicInbox::DirIdle;
# inotify_create + EPOLL_CTL_ADD
- PublicInbox::DirIdle->new([keys %{$self->{mdmap}}], $cb);
+ my $dir_idle = PublicInbox::DirIdle->new($cb);
+ $dir_idle->add_watches([keys %{$self->{mdmap}}]);
}
sub net_cb { # NetReader::(nntp|imap)_each callback
my ($self, $mic, $intvl, $uri) = @_;
my $i = $intvl //= (29 * 60);
my $end = now() + $intvl;
- warn "I: $uri idling for ${intvl}s\n";
+ warn "# $uri idling for ${intvl}s\n";
local $0 = "IDLE $0";
+ return if $self->{quit};
unless ($mic->idle) {
return if $self->{quit};
return "E: IDLE failed on $uri: $!";
sub watch_atfork_child ($) {
my ($self) = @_;
- delete $self->{idle_pids};
- delete $self->{poll_pids};
+ delete $self->{pids};
delete $self->{opendirs};
PublicInbox::DS->Reset;
my $sig = delete $self->{sig};
PublicInbox::DS::sig_setmask($self->{oldset});
}
-sub watch_atfork_parent ($) {
- my ($self) = @_;
- _done_for_now($self);
- PublicInbox::DS::block_signals();
-}
+sub watch_atfork_parent ($) { _done_for_now($_[0]) }
sub imap_idle_requeue { # DS::add_timer callback
- my ($self, $uri_intvl) = @_;
+ my ($self, $uri, $intvl) = @_;
return if $self->{quit};
- push @{$self->{idle_todo}}, $uri_intvl;
+ push @{$self->{idle_todo}}, $uri, $intvl;
event_step($self);
}
-sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
- my ($self, $pid) = @_;
- my $uri_intvl = delete $self->{idle_pids}->{$pid} or
- die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
-
- my ($uri, $intvl) = @$uri_intvl;
+sub imap_idle_reap { # awaitpid callback
+ my ($pid, $self, $uri, $intvl) = @_;
+ delete $self->{pids}->{$pid};
return if $self->{quit};
warn "W: PID=$pid on $uri died: \$?=$?\n" if $?;
- add_timer(60, \&imap_idle_requeue, $self, $uri_intvl);
-}
-
-sub reap { # callback for EOFpipe
- my ($pid, $cb, $self) = @{$_[0]};
- my $ret = waitpid($pid, 0);
- if ($ret == $pid) {
- $cb->($self, $pid); # poll_fetch_reap || imap_idle_reap
- } else {
- warn "W: waitpid($pid) => ", $ret // "($!)", "\n";
- }
+ add_timer(60, \&imap_idle_requeue, $self, $uri, $intvl);
}
-sub imap_idle_fork ($$) {
- my ($self, $uri_intvl) = @_;
- my ($uri, $intvl) = @$uri_intvl;
- pipe(my ($r, $w)) or die "pipe: $!";
+sub imap_idle_fork {
+ my ($self, $uri, $intvl) = @_;
+ return if $self->{quit};
my $seed = rand(0xffffffff);
my $pid = fork // die "fork: $!";
if ($pid == 0) {
srand($seed);
eval { Net::SSLeay::randomize() };
- close $r;
watch_atfork_child($self);
watch_imap_idle_1($self, $uri, $intvl);
- close $w;
_exit(0);
}
- $self->{idle_pids}->{$pid} = $uri_intvl;
- PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]);
+ $self->{pids}->{$pid} = undef;
+ awaitpid($pid, \&imap_idle_reap, $self, $uri, $intvl);
}
sub event_step {
return if $self->{quit};
my $idle_todo = $self->{idle_todo};
if ($idle_todo && @$idle_todo) {
- my $oldset = watch_atfork_parent($self);
+ watch_atfork_parent($self);
eval {
- while (my $uri_intvl = shift(@$idle_todo)) {
- imap_idle_fork($self, $uri_intvl);
+ while (my ($uri, $intvl) = splice(@$idle_todo, 0, 2)) {
+ imap_idle_fork($self, $uri, $intvl);
}
};
- PublicInbox::DS::sig_setmask($oldset);
die $@ if $@;
}
fs_scan_step($self) if $self->{mdre};
sub poll_fetch_fork { # DS::add_timer callback
my ($self, $intvl, $uris) = @_;
return if $self->{quit};
- pipe(my ($r, $w)) or die "pipe: $!";
- my $oldset = watch_atfork_parent($self);
+ watch_atfork_parent($self);
+ my @nntp;
+ my @imap = grep { # push() always returns > 0
+ $_->scheme =~ m!\Aimaps?!i ? 1 : (push(@nntp, $_) < 0)
+ } @$uris;
my $seed = rand(0xffffffff);
- my $pid = fork;
- if (defined($pid) && $pid == 0) {
+ my $pid = fork // die "fork: $!";
+ if ($pid == 0) {
srand($seed);
eval { Net::SSLeay::randomize() };
- close $r;
watch_atfork_child($self);
- if ($uris->[0]->scheme =~ m!\Aimaps?!i) {
- watch_imap_fetch_all($self, $uris);
- } else {
- watch_nntp_fetch_all($self, $uris);
- }
- close $w;
+ watch_imap_fetch_all($self, \@imap) if @imap;
+ watch_nntp_fetch_all($self, \@nntp) if @nntp;
_exit(0);
}
- PublicInbox::DS::sig_setmask($oldset);
- die "fork: $!" unless defined $pid;
- $self->{poll_pids}->{$pid} = [ $intvl, $uris ];
- PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
+ $self->{pids}->{$pid} = undef;
+ awaitpid($pid, \&poll_fetch_reap, $self, $intvl, $uris);
}
-sub poll_fetch_reap {
- my ($self, $pid) = @_;
- my $intvl_uris = delete $self->{poll_pids}->{$pid} or
- die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+sub poll_fetch_reap { # awaitpid callback
+ my ($pid, $self, $intvl, $uris) = @_;
+ delete $self->{pids}->{$pid};
return if $self->{quit};
- my ($intvl, $uris) = @$intvl_uris;
if ($?) {
warn "W: PID=$pid died: \$?=$?\n", map { "$_\n" } @$uris;
}
- warn("I: will check $_ in ${intvl}s\n") for @$uris;
+ warn("# will check $_ in ${intvl}s\n") for @$uris;
add_timer($intvl, \&poll_fetch_fork, $self, $intvl, $uris);
}
sub watch_imap_init ($$) {
my ($self, $poll) = @_;
my $mics = PublicInbox::NetReader::imap_common_init($self);
- my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
+ my $idle = []; # [ uri1, intvl1, uri2, intvl2 ]
for my $uri (@{$self->{imap_order}}) {
my $sec = uri_section($uri);
my $mic = $mics->{$sec};
my $intvl = $self->{cfg_opt}->{$sec}->{pollInterval};
if ($mic->has_capability('IDLE') && !$intvl) {
$intvl = $self->{cfg_opt}->{$sec}->{idleInterval};
- push @$idle, [ $uri, $intvl // () ];
+ push @$idle, $uri, $intvl;
} else {
push @{$poll->{$intvl || 120}}, $uri;
}
}
watch_fs_init($self) if $self->{mdre};
PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
- PublicInbox::DS->EventLoop; # calls ->event_step
+ PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step
_done_for_now($self);
}