X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=3c922ccdf851b01a9e99ed0f9785282b133cb57c;hb=8fa1fb2bb4409bd1be9526fab182a2e607d5fd10;hp=fe794512d50efae39c6c08e623eeb56519a85fb8;hpb=7669985165b5dacb60902406f2b5a894506bdc45;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index fe794512..3c922ccd 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -31,7 +31,7 @@ use PublicInbox::Tmpfile; use fields ('sock', # underlying socket 'rbuf', # scalarref, usually undef - 'wbuf', # arrayref of coderefs or GLOB refs + 'wbuf', # arrayref of coderefs or GLOB refs (autovivified) 'wbuf_off', # offset into first element of wbuf to start writing at ); @@ -39,16 +39,16 @@ use Errno qw(EAGAIN EINVAL); use Carp qw(confess carp); my $nextq; # queue for next_tick -my $WaitPids; # list of [ pid, callback, callback_arg ] +my $wait_pids; # list of [ pid, callback, callback_arg ] my $later_queue; # callbacks my $EXPMAP; # fd -> [ idle_time, $self ] our $EXPTIME = 180; # 3 minutes my ($later_timer, $reap_timer, $exp_timer); +my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (or DSKQXS ref) $_io, # IO::Handle for Epoll - @ToClose, # sockets to close when event loop is done $PostLoopCallback, # subref to call at the end of each loop, if defined (global) @@ -71,12 +71,10 @@ Reset all state =cut sub Reset { %DescriptorMap = (); - $nextq = []; - $WaitPids = []; + $wait_pids = undef; $later_queue = []; $EXPMAP = {}; - $reap_timer = $later_timer = $exp_timer = undef; - @ToClose = (); + $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef; $LoopTimeout = -1; # no timeout by default @Timers = (); @@ -106,16 +104,13 @@ sub SetLoopTimeout { Add a timer to occur $seconds from now. $seconds may be fractional, but timers are not guaranteed to fire at the exact time you ask for. -Returns a timer object which you can call C<< $timer->cancel >> on if you need -to. - =cut sub add_timer ($$) { my ($secs, $coderef) = @_; my $fire_time = now() + $secs; - my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer"; + my $timer = [$fire_time, $coderef]; if (!@Timers || $fire_time >= $Timers[-1][0]) { push @Timers, $timer; @@ -183,8 +178,8 @@ sub FirstTimeEventLoop { sub now () { clock_gettime(CLOCK_MONOTONIC) } sub next_tick () { - my $q = $nextq; - $nextq = []; + my $q = $nextq or return; + $nextq = undef; for (@$q) { # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak: # https://rt.perl.org/Public/Bug/Display.html?id=114340 @@ -200,7 +195,7 @@ sub next_tick () { sub RunTimers { next_tick(); - return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers; + return (($nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers; my $now = now(); @@ -211,7 +206,7 @@ sub RunTimers { } # timers may enqueue into nextq: - return 0 if (@$nextq || @ToClose); + return 0 if ($nextq || $ToClose); return $LoopTimeout unless @Timers; @@ -231,29 +226,48 @@ sub RunTimers { } # We can't use waitpid(-1) safely here since it can hit ``, system(), -# and other things. So we scan the $WaitPids list, which is hopefully -# not too big. +# and other things. So we scan the $wait_pids list, which is hopefully +# not too big. We keep $wait_pids small by not calling dwaitpid() +# until we've hit EOF when reading the stdout of the child. sub reap_pids { - my $tmp = $WaitPids; - $WaitPids = []; - $reap_timer = undef; + my $tmp = $wait_pids or return; + $wait_pids = $reap_timer = undef; foreach my $ary (@$tmp) { my ($pid, $cb, $arg) = @$ary; my $ret = waitpid($pid, WNOHANG); if ($ret == 0) { - push @$WaitPids, $ary; + push @$wait_pids, $ary; # autovivifies @$wait_pids } elsif ($cb) { eval { $cb->($arg, $pid) }; } } - if (@$WaitPids) { - # we may not be donea, and we may miss our - $reap_timer = add_timer(1, \&reap_pids); - } + # we may not be done, yet, and could've missed/masked a SIGCHLD: + $reap_timer = add_timer(1, \&reap_pids) if $wait_pids; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) -sub enqueue_reap ($) { push @$nextq, \&reap_pids }; +sub enqueue_reap ($) { push @$nextq, \&reap_pids }; # autovivifies + +sub in_loop () { $in_loop } + +# Internal function: run the post-event callback, send read events +# for pushed-back data, and close pending connections. returns 1 +# if event loop should continue, or 0 to shut it all down. +sub PostEventLoop () { + # now we can close sockets that wanted to close during our event + # processing. (we didn't want to close them during the loop, as we + # didn't want fd numbers being reused and confused during the event + # loop) + if (my $close_now = $ToClose) { + $ToClose = undef; # will be autovivified on push + # ->DESTROY methods may populate ToClose + delete($DescriptorMap{fileno($_)}) for @$close_now; + # let refcounting drop everything in $close_now at once + } + + # by default we keep running, unless a postloop callback cancels it + $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1; +} sub EpollEventLoop { local $in_loop = 1; @@ -293,28 +307,6 @@ sub SetPostLoopCallback { $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; } -# Internal function: run the post-event callback, send read events -# for pushed-back data, and close pending connections. returns 1 -# if event loop should continue, or 0 to shut it all down. -sub PostEventLoop { - # now we can close sockets that wanted to close during our event processing. - # (we didn't want to close them during the loop, as we didn't want fd numbers - # being reused and confused during the event loop) - delete($DescriptorMap{fileno($_)}) for @ToClose; - @ToClose = (); # let refcounting drop everything all at once - - # by default we keep running, unless a postloop callback (either per-object - # or global) cancels it - my $keep_running = 1; - - # now we're at the very end, call callback if defined - if (defined $PostLoopCallback) { - $keep_running &&= $PostLoopCallback->(\%DescriptorMap); - } - - return $keep_running; -} - ##################################################################### ### PublicInbox::DS-the-object code ##################################################################### @@ -358,7 +350,7 @@ sub new { ### I N S T A N C E M E T H O D S ##################################################################### -sub requeue ($) { push @$nextq, $_[0] } +sub requeue ($) { push @$nextq, $_[0] } # autovivifies =head2 C<< $obj->close >> @@ -391,7 +383,7 @@ sub close { # defer closing the actual socket until the event loop is done # processing this round of events. (otherwise we might reuse fds) - push @ToClose, $sock; + push @$ToClose, $sock; # autovivifies $ToClose return 0; } @@ -560,7 +552,7 @@ sub write { # wbuf may be an empty array if we're being called inside # ->flush_write via CODE bref: - push @{$self->{wbuf} ||= []}, $tmpio; + push @{$self->{wbuf}}, $tmpio; # autovivifies return 0; } } @@ -580,8 +572,7 @@ sub msg_more ($$) { return 1 if $nlen == 0; # all done! # queue up the unwritten substring: my $tmpio = tmpio($self, \($_[1]), $n) or return 0; - $self->{wbuf} //= $wbuf //= []; - push @$wbuf, $tmpio; + push @{$self->{wbuf}}, $tmpio; # autovivifies epwait($sock, EPOLLOUT|EPOLLONESHOT); return 0; } @@ -604,7 +595,7 @@ sub accept_tls_step ($) { return 1 if $sock->accept_SSL; return $self->close if $! != EAGAIN; epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT); - unshift @{$self->{wbuf} ||= []}, \&accept_tls_step; + unshift(@{$self->{wbuf}}, \&accept_tls_step); # autovivifies 0; } @@ -615,7 +606,7 @@ sub shutdn_tls_step ($) { return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1); return $self->close if $! != EAGAIN; epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT); - unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step; + unshift(@{$self->{wbuf}}, \&shutdn_tls_step); # autovivifies 0; } @@ -633,15 +624,11 @@ sub shutdn ($) { # must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t) sub dwaitpid ($$$) { - my ($pid, $cb, $arg) = @_; - if ($in_loop) { - push @$WaitPids, [ $pid, $cb, $arg ]; + die "Not in EventLoop\n" unless $in_loop; + push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ] - # We could've just missed our SIGCHLD, cover it, here: - requeue(\&reap_pids); - } else { - die "Not in EventLoop\n"; - } + # We could've just missed our SIGCHLD, cover it, here: + requeue(\&reap_pids); } sub _run_later () { @@ -691,12 +678,6 @@ sub not_idle_long { $exp_at > $now; } -package PublicInbox::DS::Timer; -# [$abs_float_firetime, $coderef]; -sub cancel { - $_[0][1] = undef; -} - 1; =head1 AUTHORS (Danga::Socket)