if (my $imap_pid = $self->{-imap_pid}) {
kill('QUIT', $imap_pid);
}
- if (my $idle_pids = $self->{idle_pids}) {
- kill('QUIT', $_) for (keys %$idle_pids);
+ for (qw(idle_pids poll_pids)) {
+ my $pids = $self->{$_} or next;
+ kill('QUIT', $_) for (keys %$pids);
}
if (my $idle_mic = $self->{idle_mic}) {
eval { $idle_mic->done };
sub cfg_intvl ($$) {
my ($cfg, $key) = @_;
defined(my $v = $cfg->{lc($key)}) or return;
- $v =~ /\A[0-9]+\z/s and return $v + 0;
+ $v =~ /\A[0-9]+(?:\.[0-9]+)?\z/s and return $v + 0;
if (ref($v) eq 'ARRAY') {
$v = join(', ', @$v);
warn "W: $key has multiple values: $v\nW: $key ignored\n";
} else {
- warn "W: $key=$v is not an integer value in seconds\n";
+ warn "W: $key=$v is not a numeric value in seconds\n";
}
}
sub watch_atfork_child ($) {
my ($self) = @_;
delete $self->{idle_pids};
+ delete $self->{poll_pids};
PublicInbox::DS->Reset;
PublicInbox::Sigfd::sig_setmask($self->{oldset});
%SIG = (%SIG, %{$self->{sig}});
goto(&fs_scan_step) if $self->{mdre};
}
+sub watch_imap_fetch_all ($$) {
+ my ($self, $uris) = @_;
+ for my $uri (@$uris) {
+ my $sec = imap_section($uri);
+ my $mic_arg = $self->{mic_arg}->{$sec} or
+ die "BUG: no Mail::IMAPClient->new arg for $sec";
+ my $mic = PublicInbox::IMAPClient->new(%$mic_arg) or next;
+ my $err = imap_fetch_all($self, $mic, $uri);
+ last if $self->{quit};
+ warn $err, "\n" if $err;
+ }
+}
+
+sub imap_fetch_fork ($$$) {
+ my ($self, $intvl, $uris) = @_;
+ return if $self->{quit};
+ $self->{mics} = {}; # going to be forking, so disconnect
+ defined(my $pid = fork) or die "fork: $!";
+ if ($pid == 0) {
+ watch_atfork_child($self);
+ watch_imap_fetch_all($self, $uris);
+ _exit(0);
+ }
+ $self->{poll_pids}->{$pid} = [ $intvl, $uris ];
+ PublicInbox::DS::dwaitpid($pid, \&imap_fetch_reap, $self);
+}
+
+sub imap_fetch_cb ($$$) {
+ my ($self, $intvl, $uris) = @_;
+ sub { imap_fetch_fork($self, $intvl, $uris) };
+}
+
+sub imap_fetch_reap { # PublicInbox::DS::dwaitpid callback
+ my ($self, $pid) = @_;
+ my $intvl_uris = delete $self->{poll_pids}->{$pid} or
+ die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+ return if $self->{quit};
+ my ($intvl, $uris) = @$intvl_uris;
+ if ($?) {
+ warn "W: PID=$pid died: \$?=$?\n",
+ map { $_->as_string."\n" } @$uris;
+ }
+ warn('I: will check ', $_->as_string, " in ${intvl}s\n") for @$uris;
+ PublicInbox::DS::add_timer($intvl, imap_fetch_cb($self, $intvl, $uris));
+}
+
sub watch_imap_init ($) {
my ($self) = @_;
eval { require PublicInbox::IMAPClient } or
$self->{idle_todo} = $idle;
PublicInbox::DS::requeue($self); # ->event_step to fork
}
- # TODO: polling
+ return unless scalar keys %$poll;
+ $self->{poll_pids} = {};
+
+ # poll all URIs for a given interval sequentially
+ while (my ($intvl, $uris) = each %$poll) {
+ PublicInbox::DS::requeue(imap_fetch_cb($self, $intvl, $uris));
+ }
}
sub watch {
{
use_ok 'PublicInbox::WatchMaildir';
use_ok 'PublicInbox::InboxIdle';
+ my $old_env = { HOME => $ENV{HOME} };
my $home = "$tmpdir/watch_home";
mkdir $home or BAIL_OUT $!;
mkdir "$home/.public-inbox" or BAIL_OUT $!;
my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) };
my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
$cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
- open my $err, '+>', undef or BAIL_OUT $!;
- my $w = start_script(['-watch'], undef, { 2 => $err });
+ my $watcherr = "$tmpdir/watcherr";
+ open my $err_wr, '>', $watcherr or BAIL_OUT $!;
+ open my $err, '<', $watcherr or BAIL_OUT $!;
+ my $w = start_script(['-watch'], undef, { 2 => $err_wr });
+
+ diag 'waiting for initial fetch...';
+ PublicInbox::DS->EventLoop;
+ diag 'inbox unlocked on initial fetch, waiting for IDLE';
+
+ tick until (grep(/I: \S+ idling/, <$err>));
+ open my $fh, '<', 't/iso-2202-jp.eml' or BAIL_OUT $!;
+ $old_env->{ORIGINAL_RECIPIENT} = $addr;
+ ok(run_script([qw(-mda --no-precheck)], $old_env, { 0 => $fh }),
+ 'delivered a message for IDLE to kick -watch');
+ diag 'waiting for IMAP IDLE wakeup';
+ PublicInbox::DS->SetPostLoopCallback(undef);
+ PublicInbox::DS->EventLoop;
+ diag 'inbox unlocked on IDLE wakeup';
+
+ # try again with polling
+ xsys(qw(git config), "--file=$home/.public-inbox/config",
+ "imap.imap://$ihost:$iport.PollInterval", 0.11) == 0
+ or BAIL_OUT "git config $?";
+ $w->kill('HUP');
+ diag 'waiting for -watch reload + initial fetch';
+ tick until (grep(/I: will check/, <$err>));
+
+ open $fh, '<', 't/psgi_attach.eml' or BAIL_OUT $!;
+ ok(run_script([qw(-mda --no-precheck)], $old_env, { 0 => $fh }),
+ 'delivered a message for -watch PollInterval');
+
+ diag 'waiting for PollInterval wakeup';
+ PublicInbox::DS->SetPostLoopCallback(undef);
PublicInbox::DS->EventLoop;
- diag 'inbox unlocked';
+ diag 'inbox unlocked (poll)';
$w->kill;
$w->join;
is($?, 0, 'no error in exited -watch process');
+
$cfg->each_inbox(sub { shift->unsubscribe_unlock('ident') });
$ii->close;
PublicInbox::DS->Reset;