]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/Watch.pm
No ext_urls
[public-inbox.git] / lib / PublicInbox / Watch.pm
index 0b72bd160330a1474b0c9387eda79106cc934767..90d82d21401e0a227acb2256d9bffd0613aaa97e 100644 (file)
@@ -1,8 +1,8 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # ref: https://cr.yp.to/proto/maildir.html
-#      httsp://wiki2.dovecot.org/MailboxFormat/Maildir
+#      https://wiki2.dovecot.org/MailboxFormat/Maildir
 package PublicInbox::Watch;
 use strict;
 use v5.10.1;
@@ -12,11 +12,9 @@ use PublicInbox::MdirReader;
 use PublicInbox::NetReader;
 use PublicInbox::Filter::Base qw(REJECT);
 use PublicInbox::Spamcheck;
-use PublicInbox::Sigfd;
-use PublicInbox::DS qw(now add_timer);
+use PublicInbox::DS qw(now add_timer awaitpid);
 use PublicInbox::MID qw(mids);
 use PublicInbox::ContentHash qw(content_hash);
-use PublicInbox::EOFpipe;
 use POSIX qw(_exit WNOHANG);
 
 sub compile_watchheaders ($) {
@@ -53,8 +51,7 @@ sub new {
        # indefinitely...
        foreach my $pfx (qw(publicinboxwatch publicinboxlearn)) {
                my $k = "$pfx.watchspam";
-               defined(my $dirs = $cfg->{$k}) or next;
-               $dirs = PublicInbox::Config::_array($dirs);
+               my $dirs = $cfg->get_all($k) // next;
                for my $dir (@$dirs) {
                        my $uri;
                        if (is_maildir($dir)) {
@@ -246,14 +243,9 @@ sub quit_done ($) {
        return unless $self->{quit};
 
        # don't have reliable wakeups, keep signalling
-       my $done = 1;
-       for (qw(idle_pids poll_pids)) {
-               my $pids = $self->{$_} or next;
-               for (keys %$pids) {
-                       $done = undef if kill('QUIT', $_);
-               }
-       }
-       $done;
+       my $live = grep { kill('QUIT', $_) } keys %{$self->{pids}};
+       add_timer(0.01, \&quit_done, $self) if $live;
+       $live == 0;
 }
 
 sub quit {
@@ -284,33 +276,13 @@ sub watch_fs_init ($) {
        };
        require PublicInbox::DirIdle;
        # inotify_create + EPOLL_CTL_ADD
-       PublicInbox::DirIdle->new([keys %{$self->{mdmap}}], $cb);
-}
-
-sub imap_import_msg ($$$$$) {
-       my ($self, $uri, $uid, $raw, $flags) = @_;
-       # our target audience expects LF-only, save storage
-       $$raw =~ s/\r\n/\n/sg;
-
-       my $inboxes = $self->{imap}->{$$uri};
-       if (ref($inboxes)) {
-               for my $ibx (@$inboxes) {
-                       my $eml = PublicInbox::Eml->new($$raw);
-                       import_eml($self, $ibx, $eml);
-               }
-       } elsif ($inboxes eq 'watchspam') {
-               return if $flags !~ /\\Seen\b/; # don't remove unseen messages
-               local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
-               my $eml = PublicInbox::Eml->new($raw);
-               $self->{pi_cfg}->each_inbox(\&remove_eml_i,
-                                               $self, $eml, "$uri UID:$uid");
-       } else {
-               die "BUG: destination unknown $inboxes";
-       }
+       my $dir_idle = PublicInbox::DirIdle->new($cb);
+       $dir_idle->add_watches([keys %{$self->{mdmap}}]);
 }
 
 sub net_cb { # NetReader::(nntp|imap)_each callback
        my ($uri, $art, $kw, $eml, $self, $inboxes) = @_;
+       return if grep(/\Adraft\z/, @$kw);
        local $self->{cur_uid} = $art; # IMAP UID or NNTP article
        if (ref($inboxes)) {
                my @ibx = @$inboxes;
@@ -321,6 +293,9 @@ sub net_cb { # NetReader::(nntp|imap)_each callback
                }
                import_eml($self, $last, $eml);
        } elsif ($inboxes eq 'watchspam') {
+               if ($uri->scheme =~ /\Aimaps?\z/ && !grep(/\Aseen\z/, @$kw)) {
+                       return;
+               }
                $self->{pi_cfg}->each_inbox(\&remove_eml_i,
                                $self, $eml, "$uri #$art");
        } else {
@@ -347,8 +322,9 @@ sub imap_idle_once ($$$$) {
        my ($self, $mic, $intvl, $uri) = @_;
        my $i = $intvl //= (29 * 60);
        my $end = now() + $intvl;
-       warn "I: $uri idling for ${intvl}s\n";
+       warn "# $uri idling for ${intvl}s\n";
        local $0 = "IDLE $0";
+       return if $self->{quit};
        unless ($mic->idle) {
                return if $self->{quit};
                return "E: IDLE failed on $uri: $!";
@@ -372,12 +348,13 @@ sub imap_idle_once ($$$$) {
 sub watch_imap_idle_1 ($$$) {
        my ($self, $uri, $intvl) = @_;
        my $sec = uri_section($uri);
-       my $mic_arg = $self->{mic_arg}->{$sec} or
+       my $mic_arg = $self->{net_arg}->{$sec} or
                        die "BUG: no Mail::IMAPClient->new arg for $sec";
        my $mic;
        local $0 = $uri->mailbox." $sec";
        until ($self->{quit}) {
-               $mic //= PublicInbox::IMAPClient->new(%$mic_arg);
+               $mic //= PublicInbox::NetReader::mic_new(
+                                       $self, $mic_arg, $sec, $uri);
                my $err;
                if ($mic && $mic->IsConnected) {
                        local $self->{mics_cached}->{$sec} = $mic;
@@ -398,65 +375,46 @@ sub watch_imap_idle_1 ($$$) {
 
 sub watch_atfork_child ($) {
        my ($self) = @_;
-       delete $self->{idle_pids};
-       delete $self->{poll_pids};
+       delete $self->{pids};
        delete $self->{opendirs};
        PublicInbox::DS->Reset;
-       %SIG = (%SIG, %{$self->{sig}}, CHLD => 'DEFAULT');
+       my $sig = delete $self->{sig};
+       $sig->{CHLD} = 'DEFAULT';
+       @SIG{keys %$sig} = values %$sig;
        PublicInbox::DS::sig_setmask($self->{oldset});
 }
 
-sub watch_atfork_parent ($) {
-       my ($self) = @_;
-       _done_for_now($self);
-       PublicInbox::DS::block_signals();
-}
+sub watch_atfork_parent ($) { _done_for_now($_[0]) }
 
 sub imap_idle_requeue { # DS::add_timer callback
-       my ($self, $uri_intvl) = @_;
+       my ($self, $uri, $intvl) = @_;
        return if $self->{quit};
-       push @{$self->{idle_todo}}, $uri_intvl;
+       push @{$self->{idle_todo}}, $uri, $intvl;
        event_step($self);
 }
 
-sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
-       my ($self, $pid) = @_;
-       my $uri_intvl = delete $self->{idle_pids}->{$pid} or
-               die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
-
-       my ($uri, $intvl) = @$uri_intvl;
+sub imap_idle_reap { # awaitpid callback
+       my ($pid, $self, $uri, $intvl) = @_;
+       delete $self->{pids}->{$pid};
        return if $self->{quit};
        warn "W: PID=$pid on $uri died: \$?=$?\n" if $?;
-       add_timer(60, \&imap_idle_requeue, $self, $uri_intvl);
-}
-
-sub reap { # callback for EOFpipe
-       my ($pid, $cb, $self) = @{$_[0]};
-       my $ret = waitpid($pid, 0);
-       if ($ret == $pid) {
-               $cb->($self, $pid); # poll_fetch_reap || imap_idle_reap
-       } else {
-               warn "W: waitpid($pid) => ", $ret // "($!)", "\n";
-       }
+       add_timer(60, \&imap_idle_requeue, $self, $uri, $intvl);
 }
 
-sub imap_idle_fork ($$) {
-       my ($self, $uri_intvl) = @_;
-       my ($uri, $intvl) = @$uri_intvl;
-       pipe(my ($r, $w)) or die "pipe: $!";
+sub imap_idle_fork {
+       my ($self, $uri, $intvl) = @_;
+       return if $self->{quit};
        my $seed = rand(0xffffffff);
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
                srand($seed);
                eval { Net::SSLeay::randomize() };
-               close $r;
                watch_atfork_child($self);
                watch_imap_idle_1($self, $uri, $intvl);
-               close $w;
                _exit(0);
        }
-       $self->{idle_pids}->{$pid} = $uri_intvl;
-       PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]);
+       $self->{pids}->{$pid} = undef;
+       awaitpid($pid, \&imap_idle_reap, $self, $uri, $intvl);
 }
 
 sub event_step {
@@ -464,13 +422,12 @@ sub event_step {
        return if $self->{quit};
        my $idle_todo = $self->{idle_todo};
        if ($idle_todo && @$idle_todo) {
-               my $oldset = watch_atfork_parent($self);
+               watch_atfork_parent($self);
                eval {
-                       while (my $uri_intvl = shift(@$idle_todo)) {
-                               imap_idle_fork($self, $uri_intvl);
+                       while (my ($uri, $intvl) = splice(@$idle_todo, 0, 2)) {
+                               imap_idle_fork($self, $uri, $intvl);
                        }
                };
-               PublicInbox::DS::sig_setmask($oldset);
                die $@ if $@;
        }
        fs_scan_step($self) if $self->{mdre};
@@ -506,53 +463,47 @@ sub watch_nntp_fetch_all ($$) {
 sub poll_fetch_fork { # DS::add_timer callback
        my ($self, $intvl, $uris) = @_;
        return if $self->{quit};
-       pipe(my ($r, $w)) or die "pipe: $!";
-       my $oldset = watch_atfork_parent($self);
+       watch_atfork_parent($self);
+       my @nntp;
+       my @imap = grep { # push() always returns > 0
+               $_->scheme =~ m!\Aimaps?!i ? 1 : (push(@nntp, $_) < 0)
+       } @$uris;
        my $seed = rand(0xffffffff);
-       my $pid = fork;
-       if (defined($pid) && $pid == 0) {
+       my $pid = fork // die "fork: $!";
+       if ($pid == 0) {
                srand($seed);
                eval { Net::SSLeay::randomize() };
-               close $r;
                watch_atfork_child($self);
-               if ($uris->[0]->scheme =~ m!\Aimaps?!i) {
-                       watch_imap_fetch_all($self, $uris);
-               } else {
-                       watch_nntp_fetch_all($self, $uris);
-               }
-               close $w;
+               watch_imap_fetch_all($self, \@imap) if @imap;
+               watch_nntp_fetch_all($self, \@nntp) if @nntp;
                _exit(0);
        }
-       PublicInbox::DS::sig_setmask($oldset);
-       die "fork: $!"  unless defined $pid;
-       $self->{poll_pids}->{$pid} = [ $intvl, $uris ];
-       PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
+       $self->{pids}->{$pid} = undef;
+       awaitpid($pid, \&poll_fetch_reap, $self, $intvl, $uris);
 }
 
-sub poll_fetch_reap {
-       my ($self, $pid) = @_;
-       my $intvl_uris = delete $self->{poll_pids}->{$pid} or
-               die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+sub poll_fetch_reap { # awaitpid callback
+       my ($pid, $self, $intvl, $uris) = @_;
+       delete $self->{pids}->{$pid};
        return if $self->{quit};
-       my ($intvl, $uris) = @$intvl_uris;
        if ($?) {
                warn "W: PID=$pid died: \$?=$?\n", map { "$_\n" } @$uris;
        }
-       warn("I: will check $_ in ${intvl}s\n") for @$uris;
+       warn("# will check $_ in ${intvl}s\n") for @$uris;
        add_timer($intvl, \&poll_fetch_fork, $self, $intvl, $uris);
 }
 
 sub watch_imap_init ($$) {
        my ($self, $poll) = @_;
-       my $mics = imap_common_init($self); # read args from config
-       my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
+       my $mics = PublicInbox::NetReader::imap_common_init($self);
+       my $idle = []; # [ uri1, intvl1, uri2, intvl2 ]
        for my $uri (@{$self->{imap_order}}) {
                my $sec = uri_section($uri);
                my $mic = $mics->{$sec};
-               my $intvl = $self->{imap_opt}->{$sec}->{pollInterval};
+               my $intvl = $self->{cfg_opt}->{$sec}->{pollInterval};
                if ($mic->has_capability('IDLE') && !$intvl) {
-                       $intvl = $self->{imap_opt}->{$sec}->{idleInterval};
-                       push @$idle, [ $uri, $intvl // () ];
+                       $intvl = $self->{cfg_opt}->{$sec}->{idleInterval};
+                       push @$idle, $uri, $intvl;
                } else {
                        push @{$poll->{$intvl || 120}}, $uri;
                }
@@ -565,10 +516,10 @@ sub watch_imap_init ($$) {
 
 sub watch_nntp_init ($$) {
        my ($self, $poll) = @_;
-       nntp_common_init($self); # read args from config
+       PublicInbox::NetReader::nntp_common_init($self);
        for my $uri (@{$self->{nntp_order}}) {
                my $sec = uri_section($uri);
-               my $intvl = $self->{nntp_opt}->{$sec}->{pollInterval};
+               my $intvl = $self->{cfg_opt}->{$sec}->{pollInterval};
                push @{$poll->{$intvl || 120}}, $uri;
        }
 }
@@ -586,7 +537,7 @@ sub watch { # main entry point
        }
        watch_fs_init($self) if $self->{mdre};
        PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
-       PublicInbox::DS->EventLoop; # calls ->event_step
+       PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step
        _done_for_now($self);
 }
 
@@ -701,4 +652,6 @@ EOF
        undef;
 }
 
+sub folder_select { 'select' } # for PublicInbox::NetReader
+
 1;