X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=d0aefec0f70bff09789ce645b8f4b83fd03d6a28;hb=8f23c134b6c9bfc9f23b3eed7811082e6d33a84c;hp=7eb0aadad2a3db1ee3e4f4d58a1b395536c1f91f;hpb=0541761b0558317be1825710c1545f6e718103f2;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 7eb0aada..d0aefec0 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -3,15 +3,15 @@ # # This license differs from the rest of public-inbox # -# This is a fork of the (for now) unmaintained Danga::Socket 1.61. -# Unused features will be removed, and updates will be made to take -# advantage of newer kernels. +# This is a fork of the unmaintained Danga::Socket (1.61) with +# significant changes. See Documentation/technical/ds.txt in our +# source for details. # -# API changes to diverge from Danga::Socket will happen to better -# accomodate new features and improve scalability. Do not expect -# this to be a stable API like Danga::Socket. -# Bugs encountered (and likely fixed) are reported to -# bug-Danga-Socket@rt.cpan.org and visible at: +# Do not expect this to be a stable API like Danga::Socket, +# but it will evolve to suite our needs and to take advantage of +# newer Linux and *BSD features. +# Bugs encountered were reported to bug-Danga-Socket@rt.cpan.org, +# fixed in Danga::Socket 1.62 and visible at: # https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket package PublicInbox::DS; use strict; @@ -35,19 +35,20 @@ use fields ('sock', # underlying socket 'wbuf_off', # offset into first element of wbuf to start writing at ); -use Errno qw(EAGAIN EINVAL); -use Carp qw(croak confess carp); -require File::Spec; +use Errno qw(EAGAIN EINVAL); +use Carp qw(confess carp); my $nextq; # queue for next_tick my $WaitPids; # list of [ pid, callback, callback_arg ] my $later_queue; # callbacks -my ($later_timer, $reap_timer); +my $EXPMAP; # fd -> [ idle_time, $self ] +our $EXPTIME = 180; # 3 minutes +my ($later_timer, $reap_timer, $exp_timer); +my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (or DSKQXS ref) $_io, # IO::Handle for Epoll - @ToClose, # sockets to close when event loop is done $PostLoopCallback, # subref to call at the end of each loop, if defined (global) @@ -73,8 +74,8 @@ sub Reset { $nextq = []; $WaitPids = []; $later_queue = []; - $reap_timer = $later_timer = undef; - @ToClose = (); + $EXPMAP = {}; + $ToClose = $reap_timer = $later_timer = $exp_timer = undef; $LoopTimeout = -1; # no timeout by default @Timers = (); @@ -99,20 +100,18 @@ sub SetLoopTimeout { return $LoopTimeout = $_[1] + 0; } -=head2 C<< CLASS->AddTimer( $seconds, $coderef ) >> +=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef ) >> Add a timer to occur $seconds from now. $seconds may be fractional, but timers are not guaranteed to fire at the exact time you ask for. -Returns a timer object which you can call C<< $timer->cancel >> on if you need to. - =cut -sub AddTimer { - my ($class, $secs, $coderef) = @_; +sub add_timer ($$) { + my ($secs, $coderef) = @_; my $fire_time = now() + $secs; - my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer"; + my $timer = [$fire_time, $coderef]; if (!@Timers || $fire_time >= $Timers[-1][0]) { push @Timers, $timer; @@ -197,7 +196,7 @@ sub next_tick () { sub RunTimers { next_tick(); - return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers; + return ((@$nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers; my $now = now(); @@ -208,7 +207,7 @@ sub RunTimers { } # timers may enqueue into nextq: - return 0 if (@$nextq || @ToClose); + return 0 if (@$nextq || $ToClose); return $LoopTimeout unless @Timers; @@ -245,14 +244,33 @@ sub reap_pids { } if (@$WaitPids) { # we may not be donea, and we may miss our - $reap_timer = AddTimer(undef, 1, \&reap_pids); + $reap_timer = add_timer(1, \&reap_pids); } } # reentrant SIGCHLD handler (since reap_pids is not reentrant) sub enqueue_reap ($) { push @$nextq, \&reap_pids }; -sub running () { ($SIG{CHLD} // '') eq \&enqueue_reap } +sub in_loop () { $in_loop } + +# Internal function: run the post-event callback, send read events +# for pushed-back data, and close pending connections. returns 1 +# if event loop should continue, or 0 to shut it all down. +sub PostEventLoop () { + # now we can close sockets that wanted to close during our event + # processing. (we didn't want to close them during the loop, as we + # didn't want fd numbers being reused and confused during the event + # loop) + if (my $close_now = $ToClose) { + $ToClose = undef; # will be autovivified on push + # ->DESTROY methods may populate ToClose + delete($DescriptorMap{fileno($_)}) for @$close_now; + # let refcounting drop everything in $close_now at once + } + + # by default we keep running, unless a postloop callback cancels it + $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1; +} sub EpollEventLoop { local $in_loop = 1; @@ -292,28 +310,6 @@ sub SetPostLoopCallback { $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; } -# Internal function: run the post-event callback, send read events -# for pushed-back data, and close pending connections. returns 1 -# if event loop should continue, or 0 to shut it all down. -sub PostEventLoop { - # now we can close sockets that wanted to close during our event processing. - # (we didn't want to close them during the loop, as we didn't want fd numbers - # being reused and confused during the event loop) - delete($DescriptorMap{fileno($_)}) for @ToClose; - @ToClose = (); # let refcounting drop everything all at once - - # by default we keep running, unless a postloop callback (either per-object - # or global) cancels it - my $keep_running = 1; - - # now we're at the very end, call callback if defined - if (defined $PostLoopCallback) { - $keep_running &&= $PostLoopCallback->(\%DescriptorMap); - } - - return $keep_running; -} - ##################################################################### ### PublicInbox::DS-the-object code ##################################################################### @@ -337,9 +333,6 @@ sub new { $self->{sock} = $sock; my $fd = fileno($sock); - Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) - unless $sock && $fd; - _InitPoller(); if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { @@ -349,11 +342,10 @@ sub new { } die "couldn't add epoll watch for $fd: $!\n"; } - Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") - if $DescriptorMap{$fd}; + confess("DescriptorMap{$fd} defined ($DescriptorMap{$fd})") + if defined($DescriptorMap{$fd}); $DescriptorMap{$fd} = $self; - return $self; } @@ -394,7 +386,7 @@ sub close { # defer closing the actual socket until the event loop is done # processing this round of events. (otherwise we might reuse fds) - push @ToClose, $sock; + push @$ToClose, $sock; # autovivifies $ToClose return 0; } @@ -573,15 +565,18 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub msg_more ($$) { my $self = $_[0]; my $sock = $self->{sock} or return 1; + my $wbuf = $self->{wbuf}; - if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') { + if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) && + ref($sock) ne 'IO::Socket::SSL') { my $n = send($sock, $_[1], MSG_MORE); if (defined $n) { my $nlen = bytes::length($_[1]) - $n; return 1 if $nlen == 0; # all done! # queue up the unwritten substring: my $tmpio = tmpio($self, \($_[1]), $n) or return 0; - $self->{wbuf} = [ $tmpio ]; + $self->{wbuf} //= $wbuf //= []; + push @$wbuf, $tmpio; epwait($sock, EPOLLOUT|EPOLLONESHOT); return 0; } @@ -654,13 +649,41 @@ sub _run_later () { sub later ($) { my ($cb) = @_; push @$later_queue, $cb; - $later_timer //= AddTimer(undef, 60, \&_run_later); + $later_timer //= add_timer(60, \&_run_later); } -package PublicInbox::DS::Timer; -# [$abs_float_firetime, $coderef]; -sub cancel { - $_[0][1] = undef; +sub expire_old () { + my $now = now(); + my $exp = $EXPTIME; + my $old = $now - $exp; + my %new; + while (my ($fd, $v) = each %$EXPMAP) { + my ($idle_time, $ds_obj) = @$v; + if ($idle_time < $old) { + if (!$ds_obj->shutdn) { + $new{$fd} = $v; + } + } else { + $new{$fd} = $v; + } + } + $EXPMAP = \%new; + $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef; +} + +sub update_idle_time { + my ($self) = @_; + my $sock = $self->{sock} or return; + $EXPMAP->{fileno($sock)} = [ now(), $self ]; + $exp_timer //= later(\&expire_old); +} + +sub not_idle_long { + my ($self, $now) = @_; + my $sock = $self->{sock} or return; + my $ary = $EXPMAP->{fileno($sock)} or return; + my $exp_at = $ary->[0] + $EXPTIME; + $exp_at > $now; } 1;