# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
#
# ref: https://cr.yp.to/proto/maildir.html
-# httsp://wiki2.dovecot.org/MailboxFormat/Maildir
+# https://wiki2.dovecot.org/MailboxFormat/Maildir
package PublicInbox::Watch;
use strict;
use v5.10.1;
PublicInbox::DirIdle->new([keys %{$self->{mdmap}}], $cb);
}
-sub imap_import_msg ($$$$$) {
- my ($self, $uri, $uid, $raw, $flags) = @_;
- # our target audience expects LF-only, save storage
- $$raw =~ s/\r\n/\n/sg;
-
- my $inboxes = $self->{imap}->{$$uri};
+sub net_cb { # NetReader::(nntp|imap)_each callback
+ my ($uri, $art, $kw, $eml, $self, $inboxes) = @_;
+ return if grep(/\Adraft\z/, @$kw);
+ local $self->{cur_uid} = $art; # IMAP UID or NNTP article
if (ref($inboxes)) {
- for my $ibx (@$inboxes) {
- my $eml = PublicInbox::Eml->new($$raw);
- import_eml($self, $ibx, $eml);
+ my @ibx = @$inboxes;
+ my $last = pop @ibx;
+ for my $ibx (@ibx) {
+ my $tmp = PublicInbox::Eml->new(\($eml->as_string));
+ import_eml($self, $ibx, $tmp);
}
+ import_eml($self, $last, $eml);
} elsif ($inboxes eq 'watchspam') {
- return if $flags !~ /\\Seen\b/; # don't remove unseen messages
- local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
- my $eml = PublicInbox::Eml->new($raw);
+ if ($uri->scheme =~ /\Aimaps?\z/ && !grep(/\Aseen\z/, @$kw)) {
+ return;
+ }
$self->{pi_cfg}->each_inbox(\&remove_eml_i,
- $self, $eml, "$uri UID:$uid");
+ $self, $eml, "$uri #$art");
} else {
die "BUG: destination unknown $inboxes";
}
}
-sub imap_fetch_all ($$$) {
- my ($self, $mic, $uri) = @_;
- my $sec = uri_section($uri);
- my $mbx = $uri->mailbox;
- $mic->Clear(1); # trim results history
- $mic->examine($mbx) or return "E: EXAMINE $mbx ($sec) failed: $!";
- my ($r_uidval, $r_uidnext);
- for ($mic->Results) {
- /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ and $r_uidval = $1;
- /^\* OK \[UIDNEXT ([0-9]+)\].*/ and $r_uidnext = $1;
- last if $r_uidval && $r_uidnext;
- }
- $r_uidval //= $mic->uidvalidity($mbx) //
- return "E: $uri cannot get UIDVALIDITY";
- $r_uidnext //= $mic->uidnext($mbx) //
- return "E: $uri cannot get UIDNEXT";
- my $itrk = PublicInbox::IMAPTracker->new($$uri);
- my ($l_uidval, $l_uid) = $itrk->get_last;
- $l_uidval //= $r_uidval; # first time
- $l_uid //= 1;
- if ($l_uidval != $r_uidval) {
- return "E: $uri UIDVALIDITY mismatch\n".
- "E: local=$l_uidval != remote=$r_uidval";
- }
- my $r_uid = $r_uidnext - 1;
- if ($l_uid != 1 && $l_uid > $r_uid) {
- return "E: $uri local UID exceeds remote ($l_uid > $r_uid)\n".
- "E: $uri strangely, UIDVALIDLITY matches ($l_uidval)\n";
- }
- return if $l_uid >= $r_uid; # nothing to do
-
- warn "I: $uri fetching UID $l_uid:$r_uid\n";
- $mic->Uid(1); # the default, we hope
- my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
- my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
-
- # TODO: FLAGS may be useful for personal use
- my $key = $req;
- $key =~ s/\.PEEK//;
- my ($uids, $batch);
+sub imap_fetch_all ($$) {
+ my ($self, $uri) = @_;
my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
+ $self->{incremental} = 1;
+ $self->{on_commit} = [ \&_done_for_now, $self ];
+ local $self->{cur_uid};
local $SIG{__WARN__} = sub {
- my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : '';
- $batch //= '?';
- $warn_cb->("$pfx$uri UID:$batch\n", @_);
+ my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : '';
+ my $uid = $self->{cur_uid};
+ $warn_cb->("$pfx$uri", $uid ? ("UID:$uid") : (), "\n", @_);
};
- my $err;
- do {
- # I wish "UID FETCH $START:*" could work, but:
- # 1) servers do not need to return results in any order
- # 2) Mail::IMAPClient doesn't offer a streaming API
- $uids = $mic->search("UID $l_uid:*") or
- return "E: $uri UID SEARCH $l_uid:* error: $!";
- return if scalar(@$uids) == 0;
-
- # RFC 3501 doesn't seem to indicate order of UID SEARCH
- # responses, so sort it ourselves. Order matters so
- # IMAPTracker can store the newest UID.
- @$uids = sort { $a <=> $b } @$uids;
-
- # Did we actually get new messages?
- return if $uids->[0] < $l_uid;
-
- $l_uid = $uids->[-1] + 1; # for next search
- my $last_uid;
- my $n = $self->{max_batch};
-
- while (scalar @$uids) {
- if (--$n < 0) {
- _done_for_now($self);
- $itrk->update_last($r_uidval, $last_uid);
- $n = $self->{max_batch};
- }
- my @batch = splice(@$uids, 0, $bs);
- $batch = join(',', @batch);
- local $0 = "UID:$batch $mbx $sec";
- my $r = $mic->fetch_hash($batch, $req, 'FLAGS');
- unless ($r) { # network error?
- $err = "E: $uri UID FETCH $batch error: $!";
- last;
- }
- for my $uid (@batch) {
- # messages get deleted, so holes appear
- my $per_uid = delete $r->{$uid} // next;
- my $raw = delete($per_uid->{$key}) // next;
- my $fl = $per_uid->{FLAGS} // '';
- imap_import_msg($self, $uri, $uid, \$raw, $fl);
- $last_uid = $uid;
- last if $self->{quit};
- }
- last if $self->{quit};
- }
- _done_for_now($self);
- $itrk->update_last($r_uidval, $last_uid);
- } until ($err || $self->{quit});
- $err;
+ PublicInbox::NetReader::imap_each($self, $uri, \&net_cb, $self,
+ $self->{imap}->{$$uri});
}
sub imap_idle_once ($$$$) {
$mic //= PublicInbox::IMAPClient->new(%$mic_arg);
my $err;
if ($mic && $mic->IsConnected) {
- $err = imap_fetch_all($self, $mic, $uri);
- $err //= imap_idle_once($self, $mic, $intvl, $uri);
+ local $self->{mics_cached}->{$sec} = $mic;
+ my $m = imap_fetch_all($self, $uri);
+ $m == $mic or die "BUG: wrong mic";
+ $mic->IsConnected and
+ $err = imap_idle_once($self, $mic, $intvl, $uri)
} else {
$err = "E: not connected: $!";
}
sub watch_imap_fetch_all ($$) {
my ($self, $uris) = @_;
for my $uri (@$uris) {
- my $sec = uri_section($uri);
- my $mic_arg = $self->{mic_arg}->{$sec} or
- die "BUG: no Mail::IMAPClient->new arg for $sec";
- my $mic = PublicInbox::IMAPClient->new(%$mic_arg) or next;
- my $err = imap_fetch_all($self, $mic, $uri);
+ imap_fetch_all($self, $uri);
last if $self->{quit};
- warn $err, "\n" if $err;
}
}
sub watch_nntp_fetch_all ($$) {
my ($self, $uris) = @_;
- for my $uri (@$uris) {
- my $sec = uri_section($uri);
- my $nn_arg = $self->{nn_arg}->{$sec} or
- die "BUG: no Net::NNTP->new arg for $sec";
- my $nntp_opt = $self->{nntp_opt}->{$sec};
- my $nn = nn_new($nn_arg, $nntp_opt, $uri);
- unless ($nn) {
- warn "E: $uri: \$!=$!\n";
- next;
- }
- last if $self->{quit};
- if (my $postconn = $nntp_opt->{-postconn}) {
- for my $m_arg (@$postconn) {
- my ($method, @args) = @$m_arg;
- $nn->$method(@args) and next;
- warn "E: <$uri> $method failed\n";
- $nn = undef;
- last;
- }
- }
+ $self->{incremental} = 1;
+ $self->{on_commit} = [ \&_done_for_now, $self ];
+ my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
+ local $self->{cur_uid};
+ my $uri = '';
+ local $SIG{__WARN__} = sub {
+ my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : '';
+ my $art = $self->{cur_uid};
+ $warn_cb->("$pfx$uri", $art ? ("ARTICLE $art") : (), "\n", @_);
+ };
+ for $uri (@$uris) {
+ PublicInbox::NetReader::nntp_each($self, $uri, \&net_cb, $self,
+ $self->{nntp}->{$$uri});
last if $self->{quit};
- if ($nn) {
- my $err = nntp_fetch_all($self, $nn, $uri);
- warn $err, "\n" if $err;
- }
}
}
sub watch_imap_init ($$) {
my ($self, $poll) = @_;
- my $mics = imap_common_init($self); # read args from config
+ my $mics = PublicInbox::NetReader::imap_common_init($self);
my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
for my $uri (@{$self->{imap_order}}) {
my $sec = uri_section($uri);
}
}
-sub nntp_fetch_all ($$$) {
- my ($self, $nn, $uri) = @_;
- my ($group, $num_a, $num_b) = $uri->group;
- my $sec = uri_section($uri);
- my ($nr, $beg, $end) = $nn->group($group);
- unless (defined($nr)) {
- chomp(my $msg = $nn->message);
- return "E: GROUP $group <$sec> $msg";
- }
-
- # IMAPTracker is also used for tracking NNTP, UID == article number
- # LIST.ACTIVE can get the equivalent of UIDVALIDITY, but that's
- # expensive. So we assume newsgroups don't change:
- my $itrk = PublicInbox::IMAPTracker->new($$uri);
- my (undef, $l_art) = $itrk->get_last;
- $l_art //= $beg; # initial import
-
- # allow users to specify articles to refetch
- # cf. https://tools.ietf.org/id/draft-gilman-news-url-01.txt
- # nntp://example.com/inbox.foo/$num_a-$num_b
- $l_art = $num_a if defined($num_a) && $num_a < $l_art;
- $end = $num_b if defined($num_b) && $num_b < $end;
-
- return if $l_art >= $end; # nothing to do
- $beg = $l_art + 1;
-
- warn "I: $uri fetching ARTICLE $beg..$end\n";
- my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
- my ($err, $art);
- local $SIG{__WARN__} = sub {
- my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : '';
- $warn_cb->("$pfx$uri ", $art ? ("ARTICLE $art") : (), "\n", @_);
- };
- my $inboxes = $self->{nntp}->{$$uri};
- my $last_art;
- my $n = $self->{max_batch};
- for ($beg..$end) {
- last if $self->{quit};
- $art = $_;
- if (--$n < 0) {
- _done_for_now($self);
- $itrk->update_last(0, $last_art);
- $n = $self->{max_batch};
- }
- my $raw = $nn->article($art);
- unless (defined($raw)) {
- my $msg = $nn->message;
- if ($nn->code == 421) { # pseudo response from Net::Cmd
- $err = "E: $msg";
- last;
- } else { # probably just a deleted message (spam)
- warn "W: $msg";
- next;
- }
- }
- s/\r\n/\n/ for @$raw;
- $raw = join('', @$raw);
- if (ref($inboxes)) {
- for my $ibx (@$inboxes) {
- my $eml = PublicInbox::Eml->new($raw);
- import_eml($self, $ibx, $eml);
- }
- } elsif ($inboxes eq 'watchspam') {
- my $eml = PublicInbox::Eml->new(\$raw);
- $self->{pi_cfg}->each_inbox(\&remove_eml_i,
- $self, $eml, "$uri ARTICLE $art");
- } else {
- die "BUG: destination unknown $inboxes";
- }
- $last_art = $art;
- }
- _done_for_now($self);
- $itrk->update_last(0, $last_art);
- $err;
-}
-
sub watch_nntp_init ($$) {
my ($self, $poll) = @_;
- nntp_common_init($self); # read args from config
+ PublicInbox::NetReader::nntp_common_init($self);
for my $uri (@{$self->{nntp_order}}) {
my $sec = uri_section($uri);
my $intvl = $self->{nntp_opt}->{$sec}->{pollInterval};