]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/Watch.pm
treewide: avoid `goto &NAME' for tail recursion
[public-inbox.git] / lib / PublicInbox / Watch.pm
index 5f78613961db1058506b8193c4ea7bd55c0522f3..8bbce92992aa004e26271394bb0c75cffa31fb5d 100644 (file)
@@ -14,7 +14,8 @@ use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now);
 use PublicInbox::MID qw(mids);
 use PublicInbox::ContentHash qw(content_hash);
-use POSIX qw(_exit);
+use PublicInbox::EOFpipe;
+use POSIX qw(_exit WNOHANG);
 
 sub compile_watchheaders ($) {
        my ($ibx) = @_;
@@ -132,17 +133,35 @@ sub _done_for_now {
 }
 
 sub remove_eml_i { # each_inbox callback
-       my ($ibx, $arg) = @_;
-       my ($self, $eml, $loc) = @$arg;
+       my ($ibx, $self, $eml, $loc) = @_;
+
        eval {
-               my $im = _importer_for($self, $ibx);
-               $im->remove($eml, 'spam');
-               if (my $scrub = $ibx->filter($im)) {
-                       my $scrubbed = $scrub->scrub($eml, 1);
-                       if ($scrubbed && $scrubbed != REJECT) {
-                               $im->remove($scrubbed, 'spam');
+               # try to avoid taking a lock or unnecessary spawning
+               my $im = $self->{importers}->{"$ibx"};
+               my $scrubbed;
+               if ((!$im || !$im->active) && $ibx->over) {
+                       if (content_exists($ibx, $eml)) {
+                               # continue
+                       } elsif (my $scrub = $ibx->filter($im)) {
+                               $scrubbed = $scrub->scrub($eml, 1);
+                               if ($scrubbed && $scrubbed != REJECT &&
+                                         !content_exists($ibx, $scrubbed)) {
+                                       return;
+                               }
+                       } else {
+                               return;
                        }
                }
+
+               $im //= _importer_for($self, $ibx); # may spawn fast-import
+               $im->remove($eml, 'spam');
+               $scrubbed //= do {
+                       my $scrub = $ibx->filter($im);
+                       $scrub ? $scrub->scrub($eml, 1) : undef;
+               };
+               if ($scrubbed && $scrubbed != REJECT) {
+                       $im->remove($scrubbed, 'spam');
+               }
        };
        if ($@) {
                warn "error removing spam at: $loc from $ibx->{name}: $@\n";
@@ -156,7 +175,7 @@ sub _remove_spam {
        $path =~ /:2,[A-R]*S[T-Za-z]*\z/ or return;
        my $eml = eml_from_path($path) or return;
        local $SIG{__WARN__} = warn_ignore_cb();
-       $self->{config}->each_inbox(\&remove_eml_i, [ $self, $eml, $path ]);
+       $self->{config}->each_inbox(\&remove_eml_i, $self, $eml, $path);
 }
 
 sub import_eml ($$$) {
@@ -249,7 +268,7 @@ sub watch_fs_init ($) {
                delete $self->{done_timer};
                _done_for_now($self);
        };
-       my $cb = sub {
+       my $cb = sub { # called by PublicInbox::DirIdle::event_step
                _try_path($self, $_[0]->fullname);
                $self->{done_timer} //= PublicInbox::DS::requeue($done);
        };
@@ -392,15 +411,15 @@ sub imap_import_msg ($$$$$) {
        if (ref($inboxes)) {
                for my $ibx (@$inboxes) {
                        my $eml = PublicInbox::Eml->new($$raw);
-                       my $x = import_eml($self, $ibx, $eml);
+                       import_eml($self, $ibx, $eml);
                }
        } elsif ($inboxes eq 'watchspam') {
                # we don't remove unseen messages
                if ($flags =~ /\\Seen\b/) {
                        local $SIG{__WARN__} = warn_ignore_cb();
                        my $eml = PublicInbox::Eml->new($raw);
-                       my $arg = [ $self, $eml, "$url UID:$uid" ];
-                       $self->{config}->each_inbox(\&remove_eml_i, $arg);
+                       $self->{config}->each_inbox(\&remove_eml_i,
+                                               $self, $eml, "$url UID:$uid");
                }
        } else {
                die "BUG: destination unknown $inboxes";
@@ -547,7 +566,7 @@ sub watch_imap_idle_1 ($$$) {
                        $err = imap_fetch_all($self, $mic, $url);
                        $err //= imap_idle_once($self, $mic, $intvl, $url);
                } else {
-                       $err = "not connected: $!";
+                       $err = "E: not connected: $!";
                }
                if ($err && !$self->{quit}) {
                        warn $err, "\n";
@@ -570,6 +589,7 @@ sub watch_atfork_child ($) {
 sub watch_atfork_parent ($) {
        my ($self) = @_;
        _done_for_now($self);
+       PublicInbox::Sigfd::block_signals();
 }
 
 sub imap_idle_requeue ($) { # DS::add_timer callback
@@ -591,17 +611,30 @@ sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
                                \&imap_idle_requeue, [ $self, $url_intvl ]);
 }
 
+sub reap { # callback for EOFpipe
+       my ($pid, $cb, $self) = @{$_[0]};
+       my $ret = waitpid($pid, 0);
+       if ($ret == $pid) {
+               $cb->($self, $pid); # poll_fetch_reap || imap_idle_reap
+       } else {
+               warn "W: waitpid($pid) => ", $ret // "($!)", "\n";
+       }
+}
+
 sub imap_idle_fork ($$) {
        my ($self, $url_intvl) = @_;
        my ($url, $intvl) = @$url_intvl;
+       pipe(my ($r, $w)) or die "pipe: $!";
        defined(my $pid = fork) or die "fork: $!";
        if ($pid == 0) {
+               close $r;
                watch_atfork_child($self);
                watch_imap_idle_1($self, $url, $intvl);
+               close $w;
                _exit(0);
        }
        $self->{idle_pids}->{$pid} = $url_intvl;
-       PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self);
+       PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]);
 }
 
 sub event_step {
@@ -609,12 +642,16 @@ sub event_step {
        return if $self->{quit};
        my $idle_todo = $self->{idle_todo};
        if ($idle_todo && @$idle_todo) {
-               watch_atfork_parent($self);
-               while (my $url_intvl = shift(@$idle_todo)) {
-                       imap_idle_fork($self, $url_intvl);
-               }
+               my $oldset = watch_atfork_parent($self);
+               eval {
+                       while (my $url_intvl = shift(@$idle_todo)) {
+                               imap_idle_fork($self, $url_intvl);
+                       }
+               };
+               PublicInbox::Sigfd::sig_setmask($oldset);
+               die $@ if $@;
        }
-       goto(&fs_scan_step) if $self->{mdre};
+       fs_scan_step($self) if $self->{mdre};
 }
 
 sub watch_imap_fetch_all ($$) {
@@ -665,22 +702,27 @@ sub watch_nntp_fetch_all ($$) {
 sub poll_fetch_fork ($) { # DS::add_timer callback
        my ($self, $intvl, $urls) = @{$_[0]};
        return if $self->{quit};
-       watch_atfork_parent($self);
-       defined(my $pid = fork) or die "fork: $!";
-       if ($pid == 0) {
+       pipe(my ($r, $w)) or die "pipe: $!";
+       my $oldset = watch_atfork_parent($self);
+       my $pid = fork;
+       if (defined($pid) && $pid == 0) {
+               close $r;
                watch_atfork_child($self);
                if ($urls->[0] =~ m!\Aimaps?://!i) {
                        watch_imap_fetch_all($self, $urls);
                } else {
                        watch_nntp_fetch_all($self, $urls);
                }
+               close $w;
                _exit(0);
        }
+       PublicInbox::Sigfd::sig_setmask($oldset);
+       die "fork: $!"  unless defined $pid;
        $self->{poll_pids}->{$pid} = [ $intvl, $urls ];
-       PublicInbox::DS::dwaitpid($pid, \&poll_fetch_reap, $self);
+       PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
 }
 
-sub poll_fetch_reap { # PublicInbox::DS::dwaitpid callback
+sub poll_fetch_reap {
        my ($self, $pid) = @_;
        my $intvl_urls = delete $self->{poll_pids}->{$pid} or
                die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
@@ -924,8 +966,8 @@ sub nntp_fetch_all ($$$) {
                        }
                } elsif ($inboxes eq 'watchspam') {
                        my $eml = PublicInbox::Eml->new(\$raw);
-                       my $arg = [ $self, $eml, "$url ARTICLE $art" ];
-                       $self->{config}->each_inbox(\&remove_eml_i, $arg);
+                       $self->{config}->each_inbox(\&remove_eml_i,
+                                       $self, $eml, "$url ARTICLE $art");
                } else {
                        die "BUG: destination unknown $inboxes";
                }
@@ -958,7 +1000,7 @@ sub watch_nntp_init ($$) {
        }
 }
 
-sub watch {
+sub watch { # main entry point
        my ($self, $sig, $oldset) = @_;
        $self->{oldset} = $oldset;
        $self->{sig} = $sig;
@@ -972,7 +1014,7 @@ sub watch {
        }
        watch_fs_init($self) if $self->{mdre};
        PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
-       PublicInbox::DS->EventLoop;
+       PublicInbox::DS->EventLoop; # calls ->event_step
        _done_for_now($self);
 }
 
@@ -1024,7 +1066,7 @@ sub fs_scan_step {
 sub scan {
        my ($self, $op) = @_;
        push @{$self->{ops}}, $op;
-       goto &fs_scan_step;
+       fs_scan_step($self);
 }
 
 sub _importer_for {
@@ -1057,7 +1099,7 @@ sub content_exists ($$) {
 
 sub _spamcheck_cb {
        my ($sc) = @_;
-       sub {
+       sub { # this gets called by (V2Writable||Import)->add
                my ($mime, $ibx) = @_;
                return if content_exists($ibx, $mime);
                my $tmp = '';