X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FWatch.pm;h=c39ce1a7d2f233b55ba42d9256e8c9c32200a5af;hb=af0b0fb7a454470a32c452119d0392e0dedb3fe1;hp=2698c44ab34b8cebaec2e321f75a560b53b1afe2;hpb=03c9119ec613fa43dcf0a50b5f35754f13228bc8;p=public-inbox.git diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index 2698c44a..c39ce1a7 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2020 all contributors +# Copyright (C) 2016-2021 all contributors # License: AGPL-3.0+ # # ref: https://cr.yp.to/proto/maildir.html @@ -14,7 +14,8 @@ use PublicInbox::Sigfd; use PublicInbox::DS qw(now); use PublicInbox::MID qw(mids); use PublicInbox::ContentHash qw(content_hash); -use POSIX qw(_exit); +use PublicInbox::EOFpipe; +use POSIX qw(_exit WNOHANG); sub compile_watchheaders ($) { my ($ibx) = @_; @@ -40,7 +41,7 @@ sub compile_watchheaders ($) { } sub new { - my ($class, $config) = @_; + my ($class, $cfg) = @_; my (%mdmap, $spamc); my (%imap, %nntp); # url => [inbox objects] or 'watchspam' @@ -49,7 +50,7 @@ sub new { # indefinitely... foreach my $pfx (qw(publicinboxwatch publicinboxlearn)) { my $k = "$pfx.watchspam"; - defined(my $dirs = $config->{$k}) or next; + defined(my $dirs = $cfg->{$k}) or next; $dirs = PublicInbox::Config::_array($dirs); for my $dir (@$dirs) { my $url; @@ -68,10 +69,10 @@ sub new { my $k = 'publicinboxwatch.spamcheck'; my $default = undef; - my $spamcheck = PublicInbox::Spamcheck::get($config, $k, $default); + my $spamcheck = PublicInbox::Spamcheck::get($cfg, $k, $default); $spamcheck = _spamcheck_cb($spamcheck) if $spamcheck; - $config->each_inbox(sub { + $cfg->each_inbox(sub { # need to make all inboxes writable for spam removal: my $ibx = $_[0] = PublicInbox::InboxWritable->new($_[0]); @@ -112,7 +113,7 @@ sub new { spamcheck => $spamcheck, mdmap => \%mdmap, mdre => $mdre, - config => $config, + pi_cfg => $cfg, imap => scalar keys %imap ? \%imap : undef, nntp => scalar keys %nntp? \%nntp : undef, importers => {}, @@ -132,8 +133,7 @@ sub _done_for_now { } sub remove_eml_i { # each_inbox callback - my ($ibx, $arg) = @_; - my ($self, $eml, $loc) = @$arg; + my ($ibx, $self, $eml, $loc) = @_; eval { # try to avoid taking a lock or unnecessary spawning @@ -175,7 +175,7 @@ sub _remove_spam { $path =~ /:2,[A-R]*S[T-Za-z]*\z/ or return; my $eml = eml_from_path($path) or return; local $SIG{__WARN__} = warn_ignore_cb(); - $self->{config}->each_inbox(\&remove_eml_i, [ $self, $eml, $path ]); + $self->{pi_cfg}->each_inbox(\&remove_eml_i, $self, $eml, $path); } sub import_eml ($$$) { @@ -217,7 +217,7 @@ sub _try_path { warn "unmappable dir: $1\n"; return; } - my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; + my $warn_cb = $SIG{__WARN__} || \&CORE::warn; local $SIG{__WARN__} = sub { my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : ''; $warn_cb->($pfx, "path: $path\n", @_); @@ -268,7 +268,7 @@ sub watch_fs_init ($) { delete $self->{done_timer}; _done_for_now($self); }; - my $cb = sub { + my $cb = sub { # called by PublicInbox::DirIdle::event_step _try_path($self, $_[0]->fullname); $self->{done_timer} //= PublicInbox::DS::requeue($done); }; @@ -316,7 +316,7 @@ sub cfg_bool ($$$) { # flesh out common IMAP-specific data structures sub imap_common_init ($) { my ($self) = @_; - my $cfg = $self->{config}; + my $cfg = $self->{pi_cfg}; my $mic_args = {}; # scheme://authority => Mail:IMAPClient arg for my $url (sort keys %{$self->{imap}}) { my $uri = PublicInbox::URIimap->new($url); @@ -411,15 +411,15 @@ sub imap_import_msg ($$$$$) { if (ref($inboxes)) { for my $ibx (@$inboxes) { my $eml = PublicInbox::Eml->new($$raw); - my $x = import_eml($self, $ibx, $eml); + import_eml($self, $ibx, $eml); } } elsif ($inboxes eq 'watchspam') { # we don't remove unseen messages if ($flags =~ /\\Seen\b/) { local $SIG{__WARN__} = warn_ignore_cb(); my $eml = PublicInbox::Eml->new($raw); - my $arg = [ $self, $eml, "$url UID:$uid" ]; - $self->{config}->each_inbox(\&remove_eml_i, $arg); + $self->{pi_cfg}->each_inbox(\&remove_eml_i, + $self, $eml, "$url UID:$uid"); } } else { die "BUG: destination unknown $inboxes"; @@ -467,7 +467,7 @@ sub imap_fetch_all ($$$) { my $key = $req; $key =~ s/\.PEEK//; my ($uids, $batch); - my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; + my $warn_cb = $SIG{__WARN__} || \&CORE::warn; local $SIG{__WARN__} = sub { my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : ''; $batch //= '?'; @@ -566,7 +566,7 @@ sub watch_imap_idle_1 ($$$) { $err = imap_fetch_all($self, $mic, $url); $err //= imap_idle_once($self, $mic, $intvl, $url); } else { - $err = "not connected: $!"; + $err = "E: not connected: $!"; } if ($err && !$self->{quit}) { warn $err, "\n"; @@ -611,17 +611,30 @@ sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback \&imap_idle_requeue, [ $self, $url_intvl ]); } +sub reap { # callback for EOFpipe + my ($pid, $cb, $self) = @{$_[0]}; + my $ret = waitpid($pid, 0); + if ($ret == $pid) { + $cb->($self, $pid); # poll_fetch_reap || imap_idle_reap + } else { + warn "W: waitpid($pid) => ", $ret // "($!)", "\n"; + } +} + sub imap_idle_fork ($$) { my ($self, $url_intvl) = @_; my ($url, $intvl) = @$url_intvl; + pipe(my ($r, $w)) or die "pipe: $!"; defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { + close $r; watch_atfork_child($self); watch_imap_idle_1($self, $url, $intvl); + close $w; _exit(0); } $self->{idle_pids}->{$pid} = $url_intvl; - PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self); + PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]); } sub event_step { @@ -638,7 +651,7 @@ sub event_step { PublicInbox::Sigfd::sig_setmask($oldset); die $@ if $@; } - goto(&fs_scan_step) if $self->{mdre}; + fs_scan_step($self) if $self->{mdre}; } sub watch_imap_fetch_all ($$) { @@ -689,24 +702,27 @@ sub watch_nntp_fetch_all ($$) { sub poll_fetch_fork ($) { # DS::add_timer callback my ($self, $intvl, $urls) = @{$_[0]}; return if $self->{quit}; + pipe(my ($r, $w)) or die "pipe: $!"; my $oldset = watch_atfork_parent($self); my $pid = fork; if (defined($pid) && $pid == 0) { + close $r; watch_atfork_child($self); if ($urls->[0] =~ m!\Aimaps?://!i) { watch_imap_fetch_all($self, $urls); } else { watch_nntp_fetch_all($self, $urls); } + close $w; _exit(0); } PublicInbox::Sigfd::sig_setmask($oldset); die "fork: $!" unless defined $pid; $self->{poll_pids}->{$pid} = [ $intvl, $urls ]; - PublicInbox::DS::dwaitpid($pid, \&poll_fetch_reap, $self); + PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]); } -sub poll_fetch_reap { # PublicInbox::DS::dwaitpid callback +sub poll_fetch_reap { my ($self, $pid) = @_; my $intvl_urls = delete $self->{poll_pids}->{$pid} or die "BUG: PID=$pid (unknown) reaped: \$?=$?\n"; @@ -759,7 +775,7 @@ sub watch_imap_init ($$) { # flesh out common NNTP-specific data structures sub nntp_common_init ($) { my ($self) = @_; - my $cfg = $self->{config}; + my $cfg = $self->{pi_cfg}; my $nn_args = {}; # scheme://authority => Net::NNTP->new arg for my $url (sort keys %{$self->{nntp}}) { my $sec = uri_section(uri_new($url)); @@ -913,7 +929,7 @@ sub nntp_fetch_all ($$$) { $beg = $l_art + 1; warn "I: $url fetching ARTICLE $beg..$end\n"; - my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ }; + my $warn_cb = $SIG{__WARN__} || \&CORE::warn; my ($err, $art); local $SIG{__WARN__} = sub { my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : ''; @@ -950,8 +966,8 @@ sub nntp_fetch_all ($$$) { } } elsif ($inboxes eq 'watchspam') { my $eml = PublicInbox::Eml->new(\$raw); - my $arg = [ $self, $eml, "$url ARTICLE $art" ]; - $self->{config}->each_inbox(\&remove_eml_i, $arg); + $self->{pi_cfg}->each_inbox(\&remove_eml_i, + $self, $eml, "$url ARTICLE $art"); } else { die "BUG: destination unknown $inboxes"; } @@ -984,7 +1000,7 @@ sub watch_nntp_init ($$) { } } -sub watch { +sub watch { # main entry point my ($self, $sig, $oldset) = @_; $self->{oldset} = $oldset; $self->{sig} = $sig; @@ -998,7 +1014,7 @@ sub watch { } watch_fs_init($self) if $self->{mdre}; PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done }); - PublicInbox::DS->EventLoop; + PublicInbox::DS->EventLoop; # calls ->event_step _done_for_now($self); } @@ -1050,7 +1066,7 @@ sub fs_scan_step { sub scan { my ($self, $op) = @_; push @{$self->{ops}}, $op; - goto &fs_scan_step; + fs_scan_step($self); } sub _importer_for { @@ -1083,7 +1099,7 @@ sub content_exists ($$) { sub _spamcheck_cb { my ($sc) = @_; - sub { + sub { # this gets called by (V2Writable||Import)->add my ($mime, $ibx) = @_; return if content_exists($ibx, $mime); my $tmp = '';