]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/WatchMaildir.pm
watch: remove {mdir} array
[public-inbox.git] / lib / PublicInbox / WatchMaildir.pm
index f36aa20aa33f6a466bfb2431f0d5641dc8c45300..8d2dc43268466efc38bea13924171f4e79973c9c 100644 (file)
@@ -40,8 +40,7 @@ sub compile_watchheaders ($) {
 
 sub new {
        my ($class, $config) = @_;
-       my (%mdmap, @mdir, $spamc);
-       my %uniq; # directory => count
+       my (%mdmap, $spamc);
        my %imap; # url => [inbox objects] or 'watchspam'
 
        # "publicinboxwatch" is the documented namespace
@@ -50,14 +49,11 @@ sub new {
        foreach my $pfx (qw(publicinboxwatch publicinboxlearn)) {
                my $k = "$pfx.watchspam";
                defined(my $dirs = $config->{$k}) or next;
-               $dirs = [ $dirs ] if !ref($dirs);
+               $dirs = PublicInbox::Config::_array($dirs);
                for my $dir (@$dirs) {
                        if (is_maildir($dir)) {
                                # skip "new", no MUA has seen it, yet.
-                               my $cur = "$dir/cur";
-                               push @mdir, $cur;
-                               $uniq{$cur}++;
-                               $mdmap{$cur} = 'watchspam';
+                               $mdmap{"$dir/cur"} = 'watchspam';
                        } elsif (my $url = imap_url($dir)) {
                                $imap{$url} = 'watchspam';
                        } else {
@@ -75,34 +71,35 @@ sub new {
                # need to make all inboxes writable for spam removal:
                my $ibx = $_[0] = PublicInbox::InboxWritable->new($_[0]);
 
-               my $watch = $ibx->{watch} or return;
-               if (is_maildir($watch)) {
-                       compile_watchheaders($ibx);
-                       my ($new, $cur) = ("$watch/new", "$watch/cur");
-                       return if is_watchspam($cur, $mdmap{$cur}, $ibx);
-                       push @mdir, $new unless $uniq{$new}++;
-                       push @mdir, $cur unless $uniq{$cur}++;
-                       push @{$mdmap{$new} ||= []}, $ibx;
-                       push @{$mdmap{$cur} ||= []}, $ibx;
-               } elsif (my $url = imap_url($watch)) {
-                       return if is_watchspam($url, $imap{$url}, $ibx);
-                       compile_watchheaders($ibx);
-                       push @{$imap{$url} ||= []}, $ibx;
-               } else {
-                       warn "watch unsupported: $k=$watch\n";
+               my $watches = $ibx->{watch} or return;
+               $watches = PublicInbox::Config::_array($watches);
+               for my $watch (@$watches) {
+                       if (is_maildir($watch)) {
+                               compile_watchheaders($ibx);
+                               my ($new, $cur) = ("$watch/new", "$watch/cur");
+                               my $cur_dst = $mdmap{$cur} //= [];
+                               return if is_watchspam($cur, $cur_dst, $ibx);
+                               push @{$mdmap{$new} //= []}, $ibx;
+                               push @$cur_dst, $ibx;
+                       } elsif (my $url = imap_url($watch)) {
+                               return if is_watchspam($url, $imap{$url}, $ibx);
+                               compile_watchheaders($ibx);
+                               push @{$imap{$url} ||= []}, $ibx;
+                       } else {
+                               warn "watch unsupported: $k=$watch\n";
+                       }
                }
        });
-       return unless scalar(@mdir) || scalar(keys %imap);
 
        my $mdre;
-       if (@mdir) {
-               $mdre = join('|', map { quotemeta($_) } @mdir);
+       if (scalar keys %mdmap) {
+               $mdre = join('|', map { quotemeta($_) } keys %mdmap);
                $mdre = qr!\A($mdre)/!;
        }
+       return unless $mdre || scalar(keys %imap);
        bless {
                spamcheck => $spamcheck,
                mdmap => \%mdmap,
-               mdir => \@mdir,
                mdre => $mdre,
                config => $config,
                imap => scalar keys %imap ? \%imap : undef,
@@ -208,9 +205,11 @@ sub quit {
        }
        if (my $idle_mic = $self->{idle_mic}) {
                eval { $idle_mic->done };
-               warn "IDLE DONE error: $@\n" if $@;
-               eval { $idle_mic->disconnect };
-               warn "IDLE LOGOUT error: $@\n" if $@;
+               if ($@) {
+                       warn "IDLE DONE error: $@\n";
+                       eval { $idle_mic->disconnect };
+                       warn "IDLE LOGOUT error: $@\n" if $@;
+               }
        }
 }
 
@@ -225,7 +224,8 @@ sub watch_fs_init ($) {
                $self->{done_timer} //= PublicInbox::DS::requeue($done);
        };
        require PublicInbox::DirIdle;
-       PublicInbox::DirIdle->new($self->{mdir}, $cb); # EPOLL_CTL_ADD
+       # inotify_create + EPOLL_CTL_ADD
+       PublicInbox::DirIdle->new([keys %{$self->{mdmap}}], $cb);
 }
 
 # returns the git config section name, e.g [imap "imaps://user@example.com"]
@@ -274,6 +274,15 @@ sub imap_common_init ($) {
                $self->{imap_opt}->{$sec}->{poll_intvl} = $to if $to;
                $to = cfg_intvl($cfg, 'imap', 'IdleInterval', $sec, $url);
                $self->{imap_opt}->{$sec}->{idle_intvl} = $to if $to;
+
+               my $key = lc("imap.$sec.fetchBatchSize");
+               my $bs = $cfg->{lc($key)} //
+                       $cfg->urlmatch('imap.fetchBatchSize', $url) // next;
+               if ($bs =~ /\A([0-9]+)\z/) {
+                       $self->{imap_opt}->{$sec}->{batch_size} = $bs;
+               } else {
+                       warn "W: $key=$bs is not an integer\n";
+               }
        }
        $mic_args;
 }
@@ -335,8 +344,8 @@ sub mic_for ($$$) { # mic = Mail::IMAPClient
        $mic;
 }
 
-sub imap_import_msg ($$$$$$) {
-       my ($self, $itrk, $url, $r_uidval, $uid, $raw) = @_;
+sub imap_import_msg ($$$$) {
+       my ($self, $url, $uid, $raw) = @_;
        # our target audience expects LF-only, save storage
        $$raw =~ s/\r\n/\n/sg;
 
@@ -353,7 +362,6 @@ sub imap_import_msg ($$$$$$) {
        } else {
                die "BUG: destination unknown $inboxes";
        }
-       $itrk->update_last($url, $r_uidval, $uid);
 }
 
 sub imap_fetch_all ($$$) {
@@ -373,8 +381,8 @@ sub imap_fetch_all ($$$) {
                return "E: $url cannot get UIDVALIDITY";
        $r_uidnext //= $mic->uidnext($mbx) //
                return "E: $url cannot get UIDNEXT";
-       my $itrk = PublicInbox::IMAPTracker->new;
-       my ($l_uidval, $l_uid) = $itrk->get_last($url);
+       my $itrk = PublicInbox::IMAPTracker->new($url);
+       my ($l_uidval, $l_uid) = $itrk->get_last;
        $l_uidval //= $r_uidval; # first time
        $l_uid //= 1;
        if ($l_uidval != $r_uidval) {
@@ -390,48 +398,60 @@ sub imap_fetch_all ($$$) {
 
        warn "I: $url fetching UID $l_uid:$r_uid\n";
        $mic->Uid(1); # the default, we hope
-       my $uids;
+       my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
        my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
+
+       # TODO: FLAGS may be useful for personal use
        my $key = $req;
        $key =~ s/\.PEEK//;
-       my $uid;
+       my ($uids, $batch);
        my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
        local $SIG{__WARN__} = sub {
-               $uid //= -1;
-               $warn_cb->("$url UID:$uid\n");
+               $batch //= '?';
+               $warn_cb->("$url UID:$batch\n");
                $warn_cb->(@_);
        };
        my $err;
        do {
+               # I wish "UID FETCH $START:*" could work, but:
+               # 1) servers do not need to return results in any order
+               # 2) Mail::IMAPClient doesn't offer a streaming API
                $uids = $mic->search("UID $l_uid:*") or
                        return "E: $url UID SEARCH $l_uid:* error: $!";
                return if scalar(@$uids) == 0;
 
                # RFC 3501 doesn't seem to indicate order of UID SEARCH
-               # responses, so sort it ourselves
+               # responses, so sort it ourselves.  Order matters so
+               # IMAPTracker can store the newest UID.
                @$uids = sort { $a <=> $b } @$uids;
 
                # Did we actually get new messages?
                return if $uids->[0] < $l_uid;
 
                $l_uid = $uids->[-1] + 1; # for next search
+               my $last_uid;
 
-               $itrk->{dbh}->begin_work;
-               while (defined(($uid = shift(@$uids)))) {
-                       local $0 = "UID:$uid $mbx $sec";
-                       my $r = $mic->fetch_hash($uid, $req);
+               while (scalar @$uids) {
+                       my @batch = splice(@$uids, 0, $bs);
+                       $batch = join(',', @batch);
+                       local $0 = "UID:$batch $mbx $sec";
+                       my $r = $mic->fetch_hash($batch, $req);
                        unless ($r) { # network error?
-                               $err = "E: $url UID FETCH $uid error: $!";
+                               $err = "E: $url UID FETCH $batch error: $!";
                                last;
                        }
-                       # messages get deleted, so holes appear
-                       defined(my $raw = delete $r->{$uid}->{$key}) or next;
-                       imap_import_msg($self, $itrk, $url, $r_uidval, $uid,
-                                       \$raw);
+                       for my $uid (@batch) {
+                               # messages get deleted, so holes appear
+                               my $per_uid = delete $r->{$uid} // next;
+                               my $raw = delete($per_uid->{$key}) // next;
+                               imap_import_msg($self, $url, $uid, \$raw);
+                               $last_uid = $uid;
+                               last if $self->{quit};
+                       }
                        last if $self->{quit};
                }
                _done_for_now($self);
-               $itrk->{dbh}->commit;
+               $itrk->update_last($r_uidval, $last_uid) if defined $last_uid;
        } until ($err || $self->{quit});
        $err;
 }
@@ -662,7 +682,7 @@ sub fs_scan_step {
                $opendirs->{$dir} = $dh if $n < 0;
        }
        if ($op && $op eq 'full') {
-               foreach my $dir (@{$self->{mdir}}) {
+               foreach my $dir (keys %{$self->{mdmap}}) {
                        next if $opendirs->{$dir}; # already in progress
                        my $ok = opendir(my $dh, $dir);
                        unless ($ok) {