X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=09dc399250a30382d73592b8015e5ae19a9fabb3;hb=55b707d788ce13696e4411389583e720ea6dab01;hp=17c640f4c1aac251460e09309dd320fb1bffd1fa;hpb=d6674af04cb74a4efd513d938bed8bf7ab2838eb;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 17c640f4..09dc3992 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -35,13 +35,15 @@ use fields ('sock', # underlying socket 'wbuf_off', # offset into first element of wbuf to start writing at ); -use Errno qw(EAGAIN EINVAL); -use Carp qw(croak confess carp); -require File::Spec; - -my $nextq = []; # queue for next_tick -my $WaitPids = []; # list of [ pid, callback, callback_arg ] -my $reap_timer; +use Errno qw(EAGAIN EINVAL); +use Carp qw(confess carp); + +my $nextq; # queue for next_tick +my $WaitPids; # list of [ pid, callback, callback_arg ] +my $later_queue; # callbacks +my $EXPMAP; # fd -> [ idle_time, $self ] +our $EXPTIME = 180; # 3 minutes +my ($later_timer, $reap_timer, $exp_timer); our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (or DSKQXS ref) @@ -69,8 +71,11 @@ Reset all state =cut sub Reset { %DescriptorMap = (); + $nextq = []; $WaitPids = []; - $reap_timer = undef; + $later_queue = []; + $EXPMAP = {}; + $reap_timer = $later_timer = $exp_timer = undef; @ToClose = (); $LoopTimeout = -1; # no timeout by default @Timers = (); @@ -251,7 +256,7 @@ sub enqueue_reap ($) { push @$nextq, \&reap_pids }; sub EpollEventLoop { local $in_loop = 1; - while (1) { + do { my @events; my $i; my $timeout = RunTimers(); @@ -265,8 +270,8 @@ sub EpollEventLoop { # in that event. $DescriptorMap{$events[$i]->[0]}->event_step; } - return unless PostEventLoop(); - } + } while (PostEventLoop()); + _run_later(); } =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> @@ -568,15 +573,18 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub msg_more ($$) { my $self = $_[0]; my $sock = $self->{sock} or return 1; + my $wbuf = $self->{wbuf}; - if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') { + if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) && + ref($sock) ne 'IO::Socket::SSL') { my $n = send($sock, $_[1], MSG_MORE); if (defined $n) { my $nlen = bytes::length($_[1]) - $n; return 1 if $nlen == 0; # all done! # queue up the unwritten substring: my $tmpio = tmpio($self, \($_[1]), $n) or return 0; - $self->{wbuf} = [ $tmpio ]; + $self->{wbuf} //= $wbuf //= []; + push @$wbuf, $tmpio; epwait($sock, EPOLLOUT|EPOLLONESHOT); return 0; } @@ -639,6 +647,53 @@ sub dwaitpid ($$$) { } } +sub _run_later () { + my $run = $later_queue; + $later_timer = undef; + $later_queue = []; + $_->() for @$run; +} + +sub later ($) { + my ($cb) = @_; + push @$later_queue, $cb; + $later_timer //= AddTimer(undef, 60, \&_run_later); +} + +sub expire_old () { + my $now = now(); + my $exp = $EXPTIME; + my $old = $now - $exp; + my %new; + while (my ($fd, $v) = each %$EXPMAP) { + my ($idle_time, $ds_obj) = @$v; + if ($idle_time < $old) { + if (!$ds_obj->shutdn) { + $new{$fd} = $v; + } + } else { + $new{$fd} = $v; + } + } + $EXPMAP = \%new; + $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef; +} + +sub update_idle_time { + my ($self) = @_; + my $sock = $self->{sock} or return; + $EXPMAP->{fileno($sock)} = [ now(), $self ]; + $exp_timer //= later(\&expire_old); +} + +sub not_idle_long { + my ($self, $now) = @_; + my $sock = $self->{sock} or return; + my $ary = $EXPMAP->{fileno($sock)} or return; + my $exp_at = $ary->[0] + $EXPTIME; + $exp_at > $now; +} + package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; sub cancel {