]> Sergey Matveev's repositories - public-inbox.git/commitdiff
watch: support IMAP polling
authorEric Wong <e@yhbt.net>
Sat, 27 Jun 2020 10:03:40 +0000 (10:03 +0000)
committerEric Wong <e@yhbt.net>
Sun, 28 Jun 2020 22:27:17 +0000 (22:27 +0000)
Not all IMAP servers support IDLE, and IDLE may be prohibitively
expensive for some IMAP servers with many inboxes.  So allow
configuring a imap.$IMAP_URL.pollInterval=SECONDS to poll
mailboxes.

We'll also need to poll for NNTP servers in the future.

lib/PublicInbox/WatchMaildir.pm
t/imapd.t

index 431350be277f881f9a2c5e6a22f4c10ce88c6bb2..ac980d9b0f131325ddccbf7cb61dd7466dadea78 100644 (file)
@@ -202,8 +202,9 @@ sub quit {
        if (my $imap_pid = $self->{-imap_pid}) {
                kill('QUIT', $imap_pid);
        }
-       if (my $idle_pids = $self->{idle_pids}) {
-               kill('QUIT', $_) for (keys %$idle_pids);
+       for (qw(idle_pids poll_pids)) {
+               my $pids = $self->{$_} or next;
+               kill('QUIT', $_) for (keys %$pids);
        }
        if (my $idle_mic = $self->{idle_mic}) {
                eval { $idle_mic->done };
@@ -237,12 +238,12 @@ sub imap_section ($) {
 sub cfg_intvl ($$) {
        my ($cfg, $key) = @_;
        defined(my $v = $cfg->{lc($key)}) or return;
-       $v =~ /\A[0-9]+\z/s and return $v + 0;
+       $v =~ /\A[0-9]+(?:\.[0-9]+)?\z/s and return $v + 0;
        if (ref($v) eq 'ARRAY') {
                $v = join(', ', @$v);
                warn "W: $key has multiple values: $v\nW: $key ignored\n";
        } else {
-               warn "W: $key=$v is not an integer value in seconds\n";
+               warn "W: $key=$v is not a numeric value in seconds\n";
        }
 }
 
@@ -460,6 +461,7 @@ sub watch_imap_idle_1 ($$$) {
 sub watch_atfork_child ($) {
        my ($self) = @_;
        delete $self->{idle_pids};
+       delete $self->{poll_pids};
        PublicInbox::DS->Reset;
        PublicInbox::Sigfd::sig_setmask($self->{oldset});
        %SIG = (%SIG, %{$self->{sig}});
@@ -504,6 +506,52 @@ sub event_step {
        goto(&fs_scan_step) if $self->{mdre};
 }
 
+sub watch_imap_fetch_all ($$) {
+       my ($self, $uris) = @_;
+       for my $uri (@$uris) {
+               my $sec = imap_section($uri);
+               my $mic_arg = $self->{mic_arg}->{$sec} or
+                       die "BUG: no Mail::IMAPClient->new arg for $sec";
+               my $mic = PublicInbox::IMAPClient->new(%$mic_arg) or next;
+               my $err = imap_fetch_all($self, $mic, $uri);
+               last if $self->{quit};
+               warn $err, "\n" if $err;
+       }
+}
+
+sub imap_fetch_fork ($$$) {
+       my ($self, $intvl, $uris) = @_;
+       return if $self->{quit};
+       $self->{mics} = {}; # going to be forking, so disconnect
+       defined(my $pid = fork) or die "fork: $!";
+       if ($pid == 0) {
+               watch_atfork_child($self);
+               watch_imap_fetch_all($self, $uris);
+               _exit(0);
+       }
+       $self->{poll_pids}->{$pid} = [ $intvl, $uris ];
+       PublicInbox::DS::dwaitpid($pid, \&imap_fetch_reap, $self);
+}
+
+sub imap_fetch_cb ($$$) {
+       my ($self, $intvl, $uris) = @_;
+       sub { imap_fetch_fork($self, $intvl, $uris) };
+}
+
+sub imap_fetch_reap { # PublicInbox::DS::dwaitpid callback
+       my ($self, $pid) = @_;
+       my $intvl_uris = delete $self->{poll_pids}->{$pid} or
+               die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+       return if $self->{quit};
+       my ($intvl, $uris) = @$intvl_uris;
+       if ($?) {
+               warn "W: PID=$pid died: \$?=$?\n",
+                       map { $_->as_string."\n" } @$uris;
+       }
+       warn('I: will check ', $_->as_string, " in ${intvl}s\n") for @$uris;
+       PublicInbox::DS::add_timer($intvl, imap_fetch_cb($self, $intvl, $uris));
+}
+
 sub watch_imap_init ($) {
        my ($self) = @_;
        eval { require PublicInbox::IMAPClient } or
@@ -542,7 +590,13 @@ sub watch_imap_init ($) {
                $self->{idle_todo} = $idle;
                PublicInbox::DS::requeue($self); # ->event_step to fork
        }
-       # TODO: polling
+       return unless scalar keys %$poll;
+       $self->{poll_pids} = {};
+
+       # poll all URIs for a given interval sequentially
+       while (my ($intvl, $uris) = each %$poll) {
+               PublicInbox::DS::requeue(imap_fetch_cb($self, $intvl, $uris));
+       }
 }
 
 sub watch {
index cc87a127851a0d025cd0beebda9657ff4676c102..ee3a3b267671f549b30de54c53aba1f5a28f87d9 100644 (file)
--- a/t/imapd.t
+++ b/t/imapd.t
@@ -443,6 +443,7 @@ ok($mic->logout, 'logged out');
 {
        use_ok 'PublicInbox::WatchMaildir';
        use_ok 'PublicInbox::InboxIdle';
+       my $old_env = { HOME => $ENV{HOME} };
        my $home = "$tmpdir/watch_home";
        mkdir $home or BAIL_OUT $!;
        mkdir "$home/.public-inbox" or BAIL_OUT $!;
@@ -464,13 +465,45 @@ ok($mic->logout, 'logged out');
        my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) };
        my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
        $cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
-       open my $err, '+>', undef or BAIL_OUT $!;
-       my $w = start_script(['-watch'], undef, { 2 => $err });
+       my $watcherr = "$tmpdir/watcherr";
+       open my $err_wr, '>', $watcherr or BAIL_OUT $!;
+       open my $err, '<', $watcherr or BAIL_OUT $!;
+       my $w = start_script(['-watch'], undef, { 2 => $err_wr });
+
+       diag 'waiting for initial fetch...';
+       PublicInbox::DS->EventLoop;
+       diag 'inbox unlocked on initial fetch, waiting for IDLE';
+
+       tick until (grep(/I: \S+ idling/, <$err>));
+       open my $fh, '<', 't/iso-2202-jp.eml' or BAIL_OUT $!;
+       $old_env->{ORIGINAL_RECIPIENT} = $addr;
+       ok(run_script([qw(-mda --no-precheck)], $old_env, { 0 => $fh }),
+               'delivered a message for IDLE to kick -watch');
+       diag 'waiting for IMAP IDLE wakeup';
+       PublicInbox::DS->SetPostLoopCallback(undef);
+       PublicInbox::DS->EventLoop;
+       diag 'inbox unlocked on IDLE wakeup';
+
+       # try again with polling
+       xsys(qw(git config), "--file=$home/.public-inbox/config",
+               "imap.imap://$ihost:$iport.PollInterval", 0.11) == 0
+               or BAIL_OUT "git config $?";
+       $w->kill('HUP');
+       diag 'waiting for -watch reload + initial fetch';
+       tick until (grep(/I: will check/, <$err>));
+
+       open $fh, '<', 't/psgi_attach.eml' or BAIL_OUT $!;
+       ok(run_script([qw(-mda --no-precheck)], $old_env, { 0 => $fh }),
+               'delivered a message for -watch PollInterval');
+
+       diag 'waiting for PollInterval wakeup';
+       PublicInbox::DS->SetPostLoopCallback(undef);
        PublicInbox::DS->EventLoop;
-       diag 'inbox unlocked';
+       diag 'inbox unlocked (poll)';
        $w->kill;
        $w->join;
        is($?, 0, 'no error in exited -watch process');
+
        $cfg->each_inbox(sub { shift->unsubscribe_unlock('ident') });
        $ii->close;
        PublicInbox::DS->Reset;