From: Eric Wong Date: Wed, 24 Feb 2021 11:31:53 +0000 (+0600) Subject: watch: switch IMAP and NNTP fetch loops to NetReader X-Git-Tag: v1.7.0~1065 X-Git-Url: http://www.git.stargrave.org/?p=public-inbox.git;a=commitdiff_plain;h=48bae7dc634583b58fd747bd557392cac60123b3 watch: switch IMAP and NNTP fetch loops to NetReader NetReader::_each were based on the -watch code they now replace. v2: do not warn on EINTR if user quit to fix occasional test failure in t/imapd.t --- diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm index 2a453217..785211bf 100644 --- a/lib/PublicInbox/NetReader.pm +++ b/lib/PublicInbox/NetReader.pm @@ -366,6 +366,13 @@ sub _imap_do_msg ($$$$$) { $eml_cb->($uri, $uid, $kw, PublicInbox::Eml->new($raw), @args); } +sub run_commit_cb ($) { + my ($self) = @_; + my $cmt_cb_args = $self->{on_commit} or return; + my ($cb, @args) = @$cmt_cb_args; + $cb->(@args); +} + sub _imap_fetch_all ($$$) { my ($self, $mic, $uri) = @_; my $sec = uri_section($uri); @@ -414,8 +421,10 @@ sub _imap_fetch_all ($$$) { # I wish "UID FETCH $START:*" could work, but: # 1) servers do not need to return results in any order # 2) Mail::IMAPClient doesn't offer a streaming API - $uids = $mic->search("UID $l_uid:*") or + unless ($uids = $mic->search("UID $l_uid:*")) { + return if $!{EINTR} && $self->{quit}; return "E: $uri UID SEARCH $l_uid:* error: $!"; + } return if scalar(@$uids) == 0; # RFC 3501 doesn't seem to indicate order of UID SEARCH @@ -437,6 +446,7 @@ sub _imap_fetch_all ($$$) { local $0 = "UID:$batch $mbx $sec"; my $r = $mic->fetch_hash($batch, $req, 'FLAGS'); unless ($r) { # network error? + last if $!{EINTR} && $self->{quit}; $err = "E: $uri UID FETCH $batch error: $!"; last; } @@ -451,6 +461,7 @@ sub _imap_fetch_all ($$$) { } last if $self->{quit}; } + run_commit_cb($self); $itrk->update_last($r_uidval, $last_uid) if $itrk; } until ($err || $self->{quit}); $err; @@ -490,7 +501,7 @@ sub imap_each { local $self->{eml_each} = [ $eml_cb, @args ]; $err = _imap_fetch_all($self, $mic, $uri); } else { - $err = "E: not connected: $!"; + $err = "E: <$uri> not connected: $!"; } warn $err if $err; $mic; @@ -555,6 +566,7 @@ sub _nntp_fetch_all ($$$) { last if $self->{quit}; $art = $_; if (--$n < 0) { + run_commit_cb($self); $itrk->update_last(0, $last_art) if $itrk; $n = $self->{max_batch}; } @@ -575,6 +587,7 @@ sub _nntp_fetch_all ($$$) { $eml_cb->($uri, $art, [], PublicInbox::Eml->new(\$raw), @args); $last_art = $art; } + run_commit_cb($self); $itrk->update_last(0, $last_art) if $itrk; $err; } @@ -585,12 +598,13 @@ sub nntp_each { my $sec = uri_section($uri); local $0 = $uri->group ." $sec"; my $nn = nn_get($self, $uri); + return if $self->{quit}; my $err; if ($nn) { local $self->{eml_each} = [ $eml_cb, @args ]; $err = _nntp_fetch_all($self, $nn, $uri); } else { - $err = "E: not connected: $!"; + $err = "E: <$uri> not connected: $!"; } warn $err if $err; $nn; diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index 4b009a28..0b72bd16 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -309,102 +309,38 @@ sub imap_import_msg ($$$$$) { } } -sub imap_fetch_all ($$$) { - my ($self, $mic, $uri) = @_; - my $sec = uri_section($uri); - my $mbx = $uri->mailbox; - $mic->Clear(1); # trim results history - $mic->examine($mbx) or return "E: EXAMINE $mbx ($sec) failed: $!"; - my ($r_uidval, $r_uidnext); - for ($mic->Results) { - /^\* OK \[UIDVALIDITY ([0-9]+)\].*/ and $r_uidval = $1; - /^\* OK \[UIDNEXT ([0-9]+)\].*/ and $r_uidnext = $1; - last if $r_uidval && $r_uidnext; - } - $r_uidval //= $mic->uidvalidity($mbx) // - return "E: $uri cannot get UIDVALIDITY"; - $r_uidnext //= $mic->uidnext($mbx) // - return "E: $uri cannot get UIDNEXT"; - my $itrk = PublicInbox::IMAPTracker->new($$uri); - my ($l_uidval, $l_uid) = $itrk->get_last; - $l_uidval //= $r_uidval; # first time - $l_uid //= 1; - if ($l_uidval != $r_uidval) { - return "E: $uri UIDVALIDITY mismatch\n". - "E: local=$l_uidval != remote=$r_uidval"; - } - my $r_uid = $r_uidnext - 1; - if ($l_uid != 1 && $l_uid > $r_uid) { - return "E: $uri local UID exceeds remote ($l_uid > $r_uid)\n". - "E: $uri strangely, UIDVALIDLITY matches ($l_uidval)\n"; - } - return if $l_uid >= $r_uid; # nothing to do - - warn "I: $uri fetching UID $l_uid:$r_uid\n"; - $mic->Uid(1); # the default, we hope - my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1; - my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK'; - - # TODO: FLAGS may be useful for personal use - my $key = $req; - $key =~ s/\.PEEK//; - my ($uids, $batch); +sub net_cb { # NetReader::(nntp|imap)_each callback + my ($uri, $art, $kw, $eml, $self, $inboxes) = @_; + local $self->{cur_uid} = $art; # IMAP UID or NNTP article + if (ref($inboxes)) { + my @ibx = @$inboxes; + my $last = pop @ibx; + for my $ibx (@ibx) { + my $tmp = PublicInbox::Eml->new(\($eml->as_string)); + import_eml($self, $ibx, $tmp); + } + import_eml($self, $last, $eml); + } elsif ($inboxes eq 'watchspam') { + $self->{pi_cfg}->each_inbox(\&remove_eml_i, + $self, $eml, "$uri #$art"); + } else { + die "BUG: destination unknown $inboxes"; + } +} + +sub imap_fetch_all ($$) { + my ($self, $uri) = @_; my $warn_cb = $SIG{__WARN__} || \&CORE::warn; + $self->{incremental} = 1; + $self->{on_commit} = [ \&_done_for_now, $self ]; + local $self->{cur_uid}; local $SIG{__WARN__} = sub { - my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : ''; - $batch //= '?'; - $warn_cb->("$pfx$uri UID:$batch\n", @_); + my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : ''; + my $uid = $self->{cur_uid}; + $warn_cb->("$pfx$uri", $uid ? ("UID:$uid") : (), "\n", @_); }; - my $err; - do { - # I wish "UID FETCH $START:*" could work, but: - # 1) servers do not need to return results in any order - # 2) Mail::IMAPClient doesn't offer a streaming API - $uids = $mic->search("UID $l_uid:*") or - return "E: $uri UID SEARCH $l_uid:* error: $!"; - return if scalar(@$uids) == 0; - - # RFC 3501 doesn't seem to indicate order of UID SEARCH - # responses, so sort it ourselves. Order matters so - # IMAPTracker can store the newest UID. - @$uids = sort { $a <=> $b } @$uids; - - # Did we actually get new messages? - return if $uids->[0] < $l_uid; - - $l_uid = $uids->[-1] + 1; # for next search - my $last_uid; - my $n = $self->{max_batch}; - - while (scalar @$uids) { - if (--$n < 0) { - _done_for_now($self); - $itrk->update_last($r_uidval, $last_uid); - $n = $self->{max_batch}; - } - my @batch = splice(@$uids, 0, $bs); - $batch = join(',', @batch); - local $0 = "UID:$batch $mbx $sec"; - my $r = $mic->fetch_hash($batch, $req, 'FLAGS'); - unless ($r) { # network error? - $err = "E: $uri UID FETCH $batch error: $!"; - last; - } - for my $uid (@batch) { - # messages get deleted, so holes appear - my $per_uid = delete $r->{$uid} // next; - my $raw = delete($per_uid->{$key}) // next; - my $fl = $per_uid->{FLAGS} // ''; - imap_import_msg($self, $uri, $uid, \$raw, $fl); - $last_uid = $uid; - last if $self->{quit}; - } - last if $self->{quit}; - } - _done_for_now($self); - $itrk->update_last($r_uidval, $last_uid); - } until ($err || $self->{quit}); - $err; + PublicInbox::NetReader::imap_each($self, $uri, \&net_cb, $self, + $self->{imap}->{$$uri}); } sub imap_idle_once ($$$$) { @@ -444,8 +380,11 @@ sub watch_imap_idle_1 ($$$) { $mic //= PublicInbox::IMAPClient->new(%$mic_arg); my $err; if ($mic && $mic->IsConnected) { - $err = imap_fetch_all($self, $mic, $uri); - $err //= imap_idle_once($self, $mic, $intvl, $uri); + local $self->{mics_cached}->{$sec} = $mic; + my $m = imap_fetch_all($self, $uri); + $m == $mic or die "BUG: wrong mic"; + $mic->IsConnected and + $err = imap_idle_once($self, $mic, $intvl, $uri) } else { $err = "E: not connected: $!"; } @@ -540,43 +479,27 @@ sub event_step { sub watch_imap_fetch_all ($$) { my ($self, $uris) = @_; for my $uri (@$uris) { - my $sec = uri_section($uri); - my $mic_arg = $self->{mic_arg}->{$sec} or - die "BUG: no Mail::IMAPClient->new arg for $sec"; - my $mic = PublicInbox::IMAPClient->new(%$mic_arg) or next; - my $err = imap_fetch_all($self, $mic, $uri); + imap_fetch_all($self, $uri); last if $self->{quit}; - warn $err, "\n" if $err; } } sub watch_nntp_fetch_all ($$) { my ($self, $uris) = @_; - for my $uri (@$uris) { - my $sec = uri_section($uri); - my $nn_arg = $self->{nn_arg}->{$sec} or - die "BUG: no Net::NNTP->new arg for $sec"; - my $nntp_opt = $self->{nntp_opt}->{$sec}; - my $nn = nn_new($nn_arg, $nntp_opt, $uri); - unless ($nn) { - warn "E: $uri: \$!=$!\n"; - next; - } - last if $self->{quit}; - if (my $postconn = $nntp_opt->{-postconn}) { - for my $m_arg (@$postconn) { - my ($method, @args) = @$m_arg; - $nn->$method(@args) and next; - warn "E: <$uri> $method failed\n"; - $nn = undef; - last; - } - } + $self->{incremental} = 1; + $self->{on_commit} = [ \&_done_for_now, $self ]; + my $warn_cb = $SIG{__WARN__} || \&CORE::warn; + local $self->{cur_uid}; + my $uri = ''; + local $SIG{__WARN__} = sub { + my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : ''; + my $art = $self->{cur_uid}; + $warn_cb->("$pfx$uri", $art ? ("ARTICLE $art") : (), "\n", @_); + }; + for $uri (@$uris) { + PublicInbox::NetReader::nntp_each($self, $uri, \&net_cb, $self, + $self->{nntp}->{$$uri}); last if $self->{quit}; - if ($nn) { - my $err = nntp_fetch_all($self, $nn, $uri); - warn $err, "\n" if $err; - } } } @@ -640,82 +563,6 @@ sub watch_imap_init ($$) { } } -sub nntp_fetch_all ($$$) { - my ($self, $nn, $uri) = @_; - my ($group, $num_a, $num_b) = $uri->group; - my $sec = uri_section($uri); - my ($nr, $beg, $end) = $nn->group($group); - unless (defined($nr)) { - chomp(my $msg = $nn->message); - return "E: GROUP $group <$sec> $msg"; - } - - # IMAPTracker is also used for tracking NNTP, UID == article number - # LIST.ACTIVE can get the equivalent of UIDVALIDITY, but that's - # expensive. So we assume newsgroups don't change: - my $itrk = PublicInbox::IMAPTracker->new($$uri); - my (undef, $l_art) = $itrk->get_last; - $l_art //= $beg; # initial import - - # allow users to specify articles to refetch - # cf. https://tools.ietf.org/id/draft-gilman-news-url-01.txt - # nntp://example.com/inbox.foo/$num_a-$num_b - $l_art = $num_a if defined($num_a) && $num_a < $l_art; - $end = $num_b if defined($num_b) && $num_b < $end; - - return if $l_art >= $end; # nothing to do - $beg = $l_art + 1; - - warn "I: $uri fetching ARTICLE $beg..$end\n"; - my $warn_cb = $SIG{__WARN__} || \&CORE::warn; - my ($err, $art); - local $SIG{__WARN__} = sub { - my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : ''; - $warn_cb->("$pfx$uri ", $art ? ("ARTICLE $art") : (), "\n", @_); - }; - my $inboxes = $self->{nntp}->{$$uri}; - my $last_art; - my $n = $self->{max_batch}; - for ($beg..$end) { - last if $self->{quit}; - $art = $_; - if (--$n < 0) { - _done_for_now($self); - $itrk->update_last(0, $last_art); - $n = $self->{max_batch}; - } - my $raw = $nn->article($art); - unless (defined($raw)) { - my $msg = $nn->message; - if ($nn->code == 421) { # pseudo response from Net::Cmd - $err = "E: $msg"; - last; - } else { # probably just a deleted message (spam) - warn "W: $msg"; - next; - } - } - s/\r\n/\n/ for @$raw; - $raw = join('', @$raw); - if (ref($inboxes)) { - for my $ibx (@$inboxes) { - my $eml = PublicInbox::Eml->new($raw); - import_eml($self, $ibx, $eml); - } - } elsif ($inboxes eq 'watchspam') { - my $eml = PublicInbox::Eml->new(\$raw); - $self->{pi_cfg}->each_inbox(\&remove_eml_i, - $self, $eml, "$uri ARTICLE $art"); - } else { - die "BUG: destination unknown $inboxes"; - } - $last_art = $art; - } - _done_for_now($self); - $itrk->update_last(0, $last_art); - $err; -} - sub watch_nntp_init ($$) { my ($self, $poll) = @_; nntp_common_init($self); # read args from config diff --git a/t/imapd.t b/t/imapd.t index 0583dfdd..f1b498a7 100644 --- a/t/imapd.t +++ b/t/imapd.t @@ -507,7 +507,7 @@ SKIP: { $ii->close; PublicInbox::DS->Reset; seek($err, 0, 0); - my @err = grep(!/^I:/, <$err>); + my @err = grep(!/^(?:I:|#)/, <$err>); is(@err, 0, 'no warnings/errors from -watch'.join(' ', @err)); if ($ENV{TEST_KILL_IMAPD}) { # not sure how reliable this test can be diff --git a/t/nntpd.t b/t/nntpd.t index 18aaccbe..16a2ab76 100644 --- a/t/nntpd.t +++ b/t/nntpd.t @@ -459,7 +459,7 @@ sub test_watch { $cfg->each_inbox(sub { shift->unsubscribe_unlock('ident') }); $ii->close; PublicInbox::DS->Reset; - my @err = grep(!/^I:/, <$err>); + my @err = grep(!/^(?:I:|#)/, <$err>); is(@err, 0, 'no warnings/errors from -watch'.join(' ', @err)); my @ls = xqx(['git', "--git-dir=$inboxdir", qw(ls-tree -r HEAD)]); isnt(scalar(@ls), 0, 'imported something');