X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=856884bbb9130e7dff9065e05619e98b385d1634;hb=ede8cc1c664e332cfa44bd22c36a31aac1a5fb13;hp=1e51dc41c1dbc46cc5d1011ef7d1b2b6ff95ba86;hpb=2535900d284028258bea3bb2c1598b675f8f963c;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 1e51dc41..856884bb 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -24,8 +24,10 @@ use parent qw(Exporter); our @EXPORT_OK = qw(now msg_more); use warnings; use 5.010_001; +use Scalar::Util qw(blessed); use PublicInbox::Syscall qw(:epoll); +use PublicInbox::Tmpfile; use fields ('sock', # underlying socket 'rbuf', # scalarref, usually undef @@ -33,13 +35,16 @@ use fields ('sock', # underlying socket 'wbuf_off', # offset into first element of wbuf to start writing at ); -use Errno qw(EAGAIN EINVAL EEXIST); +use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess carp); require File::Spec; -my $nextq = []; # queue for next_tick -my $WaitPids = []; # list of [ pid, callback, callback_arg ] -my $reap_timer; +my $nextq; # queue for next_tick +my $WaitPids; # list of [ pid, callback, callback_arg ] +my $later_queue; # callbacks +my $EXPMAP; # fd -> [ idle_time, $self ] +our $EXPTIME = 180; # 3 minutes +my ($later_timer, $reap_timer, $exp_timer); our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (or DSKQXS ref) @@ -51,6 +56,7 @@ our ( $LoopTimeout, # timeout of event loop in milliseconds $DoneInit, # if we've done the one-time module init yet @Timers, # timers + $in_loop, ); Reset(); @@ -66,8 +72,11 @@ Reset all state =cut sub Reset { %DescriptorMap = (); + $nextq = []; $WaitPids = []; - $reap_timer = undef; + $later_queue = []; + $EXPMAP = {}; + $reap_timer = $later_timer = $exp_timer = undef; @ToClose = (); $LoopTimeout = -1; # no timeout by default @Timers = (); @@ -177,10 +186,12 @@ sub next_tick () { my $q = $nextq; $nextq = []; for (@$q) { - if (ref($_) eq 'CODE') { - $_->(); - } else { + # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak: + # https://rt.perl.org/Public/Bug/Display.html?id=114340 + if (blessed($_)) { $_->event_step; + } else { + $_->(); } } } @@ -244,9 +255,11 @@ sub reap_pids { # reentrant SIGCHLD handler (since reap_pids is not reentrant) sub enqueue_reap ($) { push @$nextq, \&reap_pids }; +sub running () { ($SIG{CHLD} // '') eq \&enqueue_reap } + sub EpollEventLoop { - local $SIG{CHLD} = \&enqueue_reap; - while (1) { + local $in_loop = 1; + do { my @events; my $i; my $timeout = RunTimers(); @@ -260,8 +273,8 @@ sub EpollEventLoop { # in that event. $DescriptorMap{$events[$i]->[0]}->event_step; } - return unless PostEventLoop(); - } + } while (PostEventLoop()); + _run_later(); } =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> @@ -488,15 +501,8 @@ sub drop { # PerlIO::mmap or PerlIO::scalar if needed sub tmpio ($$$) { my ($self, $bref, $off) = @_; - my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND - do { - my $fn = File::Spec->tmpdir . '/wbuf-' . rand; - if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely - unlink($fn) or return drop($self, "unlink($fn) $!"); - } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM - return drop($self, "open: $!"); - } - } until (defined $fh); + my $fh = tmpfile('wbuf', $self->{sock}, 1) or + return drop($self, "tmpfile $!"); $fh->autoflush(1); my $len = bytes::length($$bref) - $off; $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!"); @@ -631,8 +637,7 @@ sub shutdn ($) { # must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t) sub dwaitpid ($$$) { my ($pid, $cb, $arg) = @_; - my $chld = $SIG{CHLD}; - if (defined($chld) && $chld eq \&enqueue_reap) { + if ($in_loop) { push @$WaitPids, [ $pid, $cb, $arg ]; # We could've just missed our SIGCHLD, cover it, here: @@ -642,6 +647,53 @@ sub dwaitpid ($$$) { } } +sub _run_later () { + my $run = $later_queue; + $later_timer = undef; + $later_queue = []; + $_->() for @$run; +} + +sub later ($) { + my ($cb) = @_; + push @$later_queue, $cb; + $later_timer //= AddTimer(undef, 60, \&_run_later); +} + +sub expire_old () { + my $now = now(); + my $exp = $EXPTIME; + my $old = $now - $exp; + my %new; + while (my ($fd, $v) = each %$EXPMAP) { + my ($idle_time, $ds_obj) = @$v; + if ($idle_time < $old) { + if (!$ds_obj->shutdn) { + $new{$fd} = $v; + } + } else { + $new{$fd} = $v; + } + } + $EXPMAP = \%new; + $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef; +} + +sub update_idle_time { + my ($self) = @_; + my $sock = $self->{sock} or return; + $EXPMAP->{fileno($sock)} = [ now(), $self ]; + $exp_timer //= later(\&expire_old); +} + +sub not_idle_long { + my ($self, $now) = @_; + my $sock = $self->{sock} or return; + my $ary = $EXPMAP->{fileno($sock)} or return; + my $exp_at = $ary->[0] + $EXPTIME; + $exp_at > $now; +} + package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; sub cancel {