X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FWatch.pm;h=90d82d21401e0a227acb2256d9bffd0613aaa97e;hb=HEAD;hp=0b72bd160330a1474b0c9387eda79106cc934767;hpb=48bae7dc634583b58fd747bd557392cac60123b3;p=public-inbox.git diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index 0b72bd16..90d82d21 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -1,8 +1,8 @@ -# Copyright (C) 2016-2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # # ref: https://cr.yp.to/proto/maildir.html -# httsp://wiki2.dovecot.org/MailboxFormat/Maildir +# https://wiki2.dovecot.org/MailboxFormat/Maildir package PublicInbox::Watch; use strict; use v5.10.1; @@ -12,11 +12,9 @@ use PublicInbox::MdirReader; use PublicInbox::NetReader; use PublicInbox::Filter::Base qw(REJECT); use PublicInbox::Spamcheck; -use PublicInbox::Sigfd; -use PublicInbox::DS qw(now add_timer); +use PublicInbox::DS qw(now add_timer awaitpid); use PublicInbox::MID qw(mids); use PublicInbox::ContentHash qw(content_hash); -use PublicInbox::EOFpipe; use POSIX qw(_exit WNOHANG); sub compile_watchheaders ($) { @@ -53,8 +51,7 @@ sub new { # indefinitely... foreach my $pfx (qw(publicinboxwatch publicinboxlearn)) { my $k = "$pfx.watchspam"; - defined(my $dirs = $cfg->{$k}) or next; - $dirs = PublicInbox::Config::_array($dirs); + my $dirs = $cfg->get_all($k) // next; for my $dir (@$dirs) { my $uri; if (is_maildir($dir)) { @@ -246,14 +243,9 @@ sub quit_done ($) { return unless $self->{quit}; # don't have reliable wakeups, keep signalling - my $done = 1; - for (qw(idle_pids poll_pids)) { - my $pids = $self->{$_} or next; - for (keys %$pids) { - $done = undef if kill('QUIT', $_); - } - } - $done; + my $live = grep { kill('QUIT', $_) } keys %{$self->{pids}}; + add_timer(0.01, \&quit_done, $self) if $live; + $live == 0; } sub quit { @@ -284,33 +276,13 @@ sub watch_fs_init ($) { }; require PublicInbox::DirIdle; # inotify_create + EPOLL_CTL_ADD - PublicInbox::DirIdle->new([keys %{$self->{mdmap}}], $cb); -} - -sub imap_import_msg ($$$$$) { - my ($self, $uri, $uid, $raw, $flags) = @_; - # our target audience expects LF-only, save storage - $$raw =~ s/\r\n/\n/sg; - - my $inboxes = $self->{imap}->{$$uri}; - if (ref($inboxes)) { - for my $ibx (@$inboxes) { - my $eml = PublicInbox::Eml->new($$raw); - import_eml($self, $ibx, $eml); - } - } elsif ($inboxes eq 'watchspam') { - return if $flags !~ /\\Seen\b/; # don't remove unseen messages - local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); - my $eml = PublicInbox::Eml->new($raw); - $self->{pi_cfg}->each_inbox(\&remove_eml_i, - $self, $eml, "$uri UID:$uid"); - } else { - die "BUG: destination unknown $inboxes"; - } + my $dir_idle = PublicInbox::DirIdle->new($cb); + $dir_idle->add_watches([keys %{$self->{mdmap}}]); } sub net_cb { # NetReader::(nntp|imap)_each callback my ($uri, $art, $kw, $eml, $self, $inboxes) = @_; + return if grep(/\Adraft\z/, @$kw); local $self->{cur_uid} = $art; # IMAP UID or NNTP article if (ref($inboxes)) { my @ibx = @$inboxes; @@ -321,6 +293,9 @@ sub net_cb { # NetReader::(nntp|imap)_each callback } import_eml($self, $last, $eml); } elsif ($inboxes eq 'watchspam') { + if ($uri->scheme =~ /\Aimaps?\z/ && !grep(/\Aseen\z/, @$kw)) { + return; + } $self->{pi_cfg}->each_inbox(\&remove_eml_i, $self, $eml, "$uri #$art"); } else { @@ -347,8 +322,9 @@ sub imap_idle_once ($$$$) { my ($self, $mic, $intvl, $uri) = @_; my $i = $intvl //= (29 * 60); my $end = now() + $intvl; - warn "I: $uri idling for ${intvl}s\n"; + warn "# $uri idling for ${intvl}s\n"; local $0 = "IDLE $0"; + return if $self->{quit}; unless ($mic->idle) { return if $self->{quit}; return "E: IDLE failed on $uri: $!"; @@ -372,12 +348,13 @@ sub imap_idle_once ($$$$) { sub watch_imap_idle_1 ($$$) { my ($self, $uri, $intvl) = @_; my $sec = uri_section($uri); - my $mic_arg = $self->{mic_arg}->{$sec} or + my $mic_arg = $self->{net_arg}->{$sec} or die "BUG: no Mail::IMAPClient->new arg for $sec"; my $mic; local $0 = $uri->mailbox." $sec"; until ($self->{quit}) { - $mic //= PublicInbox::IMAPClient->new(%$mic_arg); + $mic //= PublicInbox::NetReader::mic_new( + $self, $mic_arg, $sec, $uri); my $err; if ($mic && $mic->IsConnected) { local $self->{mics_cached}->{$sec} = $mic; @@ -398,65 +375,46 @@ sub watch_imap_idle_1 ($$$) { sub watch_atfork_child ($) { my ($self) = @_; - delete $self->{idle_pids}; - delete $self->{poll_pids}; + delete $self->{pids}; delete $self->{opendirs}; PublicInbox::DS->Reset; - %SIG = (%SIG, %{$self->{sig}}, CHLD => 'DEFAULT'); + my $sig = delete $self->{sig}; + $sig->{CHLD} = 'DEFAULT'; + @SIG{keys %$sig} = values %$sig; PublicInbox::DS::sig_setmask($self->{oldset}); } -sub watch_atfork_parent ($) { - my ($self) = @_; - _done_for_now($self); - PublicInbox::DS::block_signals(); -} +sub watch_atfork_parent ($) { _done_for_now($_[0]) } sub imap_idle_requeue { # DS::add_timer callback - my ($self, $uri_intvl) = @_; + my ($self, $uri, $intvl) = @_; return if $self->{quit}; - push @{$self->{idle_todo}}, $uri_intvl; + push @{$self->{idle_todo}}, $uri, $intvl; event_step($self); } -sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback - my ($self, $pid) = @_; - my $uri_intvl = delete $self->{idle_pids}->{$pid} or - die "BUG: PID=$pid (unknown) reaped: \$?=$?\n"; - - my ($uri, $intvl) = @$uri_intvl; +sub imap_idle_reap { # awaitpid callback + my ($pid, $self, $uri, $intvl) = @_; + delete $self->{pids}->{$pid}; return if $self->{quit}; warn "W: PID=$pid on $uri died: \$?=$?\n" if $?; - add_timer(60, \&imap_idle_requeue, $self, $uri_intvl); -} - -sub reap { # callback for EOFpipe - my ($pid, $cb, $self) = @{$_[0]}; - my $ret = waitpid($pid, 0); - if ($ret == $pid) { - $cb->($self, $pid); # poll_fetch_reap || imap_idle_reap - } else { - warn "W: waitpid($pid) => ", $ret // "($!)", "\n"; - } + add_timer(60, \&imap_idle_requeue, $self, $uri, $intvl); } -sub imap_idle_fork ($$) { - my ($self, $uri_intvl) = @_; - my ($uri, $intvl) = @$uri_intvl; - pipe(my ($r, $w)) or die "pipe: $!"; +sub imap_idle_fork { + my ($self, $uri, $intvl) = @_; + return if $self->{quit}; my $seed = rand(0xffffffff); my $pid = fork // die "fork: $!"; if ($pid == 0) { srand($seed); eval { Net::SSLeay::randomize() }; - close $r; watch_atfork_child($self); watch_imap_idle_1($self, $uri, $intvl); - close $w; _exit(0); } - $self->{idle_pids}->{$pid} = $uri_intvl; - PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]); + $self->{pids}->{$pid} = undef; + awaitpid($pid, \&imap_idle_reap, $self, $uri, $intvl); } sub event_step { @@ -464,13 +422,12 @@ sub event_step { return if $self->{quit}; my $idle_todo = $self->{idle_todo}; if ($idle_todo && @$idle_todo) { - my $oldset = watch_atfork_parent($self); + watch_atfork_parent($self); eval { - while (my $uri_intvl = shift(@$idle_todo)) { - imap_idle_fork($self, $uri_intvl); + while (my ($uri, $intvl) = splice(@$idle_todo, 0, 2)) { + imap_idle_fork($self, $uri, $intvl); } }; - PublicInbox::DS::sig_setmask($oldset); die $@ if $@; } fs_scan_step($self) if $self->{mdre}; @@ -506,53 +463,47 @@ sub watch_nntp_fetch_all ($$) { sub poll_fetch_fork { # DS::add_timer callback my ($self, $intvl, $uris) = @_; return if $self->{quit}; - pipe(my ($r, $w)) or die "pipe: $!"; - my $oldset = watch_atfork_parent($self); + watch_atfork_parent($self); + my @nntp; + my @imap = grep { # push() always returns > 0 + $_->scheme =~ m!\Aimaps?!i ? 1 : (push(@nntp, $_) < 0) + } @$uris; my $seed = rand(0xffffffff); - my $pid = fork; - if (defined($pid) && $pid == 0) { + my $pid = fork // die "fork: $!"; + if ($pid == 0) { srand($seed); eval { Net::SSLeay::randomize() }; - close $r; watch_atfork_child($self); - if ($uris->[0]->scheme =~ m!\Aimaps?!i) { - watch_imap_fetch_all($self, $uris); - } else { - watch_nntp_fetch_all($self, $uris); - } - close $w; + watch_imap_fetch_all($self, \@imap) if @imap; + watch_nntp_fetch_all($self, \@nntp) if @nntp; _exit(0); } - PublicInbox::DS::sig_setmask($oldset); - die "fork: $!" unless defined $pid; - $self->{poll_pids}->{$pid} = [ $intvl, $uris ]; - PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]); + $self->{pids}->{$pid} = undef; + awaitpid($pid, \&poll_fetch_reap, $self, $intvl, $uris); } -sub poll_fetch_reap { - my ($self, $pid) = @_; - my $intvl_uris = delete $self->{poll_pids}->{$pid} or - die "BUG: PID=$pid (unknown) reaped: \$?=$?\n"; +sub poll_fetch_reap { # awaitpid callback + my ($pid, $self, $intvl, $uris) = @_; + delete $self->{pids}->{$pid}; return if $self->{quit}; - my ($intvl, $uris) = @$intvl_uris; if ($?) { warn "W: PID=$pid died: \$?=$?\n", map { "$_\n" } @$uris; } - warn("I: will check $_ in ${intvl}s\n") for @$uris; + warn("# will check $_ in ${intvl}s\n") for @$uris; add_timer($intvl, \&poll_fetch_fork, $self, $intvl, $uris); } sub watch_imap_init ($$) { my ($self, $poll) = @_; - my $mics = imap_common_init($self); # read args from config - my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ] + my $mics = PublicInbox::NetReader::imap_common_init($self); + my $idle = []; # [ uri1, intvl1, uri2, intvl2 ] for my $uri (@{$self->{imap_order}}) { my $sec = uri_section($uri); my $mic = $mics->{$sec}; - my $intvl = $self->{imap_opt}->{$sec}->{pollInterval}; + my $intvl = $self->{cfg_opt}->{$sec}->{pollInterval}; if ($mic->has_capability('IDLE') && !$intvl) { - $intvl = $self->{imap_opt}->{$sec}->{idleInterval}; - push @$idle, [ $uri, $intvl // () ]; + $intvl = $self->{cfg_opt}->{$sec}->{idleInterval}; + push @$idle, $uri, $intvl; } else { push @{$poll->{$intvl || 120}}, $uri; } @@ -565,10 +516,10 @@ sub watch_imap_init ($$) { sub watch_nntp_init ($$) { my ($self, $poll) = @_; - nntp_common_init($self); # read args from config + PublicInbox::NetReader::nntp_common_init($self); for my $uri (@{$self->{nntp_order}}) { my $sec = uri_section($uri); - my $intvl = $self->{nntp_opt}->{$sec}->{pollInterval}; + my $intvl = $self->{cfg_opt}->{$sec}->{pollInterval}; push @{$poll->{$intvl || 120}}, $uri; } } @@ -586,7 +537,7 @@ sub watch { # main entry point } watch_fs_init($self) if $self->{mdre}; PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done }); - PublicInbox::DS->EventLoop; # calls ->event_step + PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step _done_for_now($self); } @@ -701,4 +652,6 @@ EOF undef; } +sub folder_select { 'select' } # for PublicInbox::NetReader + 1;