X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=d0caa5e73102f13ea27bf455e242b19cbb72220e;hb=4ccff6f9122da89c18ae3f38a130a06de5434826;hp=3989bd2b7c546df59fcd9fbd8683d73cabc0b410;hpb=8acfb1f135a0c3d156ed5cc19a26844c09d20acb;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 3989bd2b..d0caa5e7 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -13,42 +13,39 @@ # Bugs encountered were reported to bug-Danga-Socket@rt.cpan.org, # fixed in Danga::Socket 1.62 and visible at: # https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket +# +# fields: +# sock: underlying socket +# rbuf: scalarref, usually undef +# wbuf: arrayref of coderefs or tmpio (autovivified)) +# (tmpio = [ GLOB, offset, [ length ] ]) package PublicInbox::DS; use strict; use bytes; use POSIX qw(WNOHANG); use IO::Handle qw(); -use Fcntl qw(SEEK_SET :DEFAULT); +use Fcntl qw(SEEK_SET :DEFAULT O_APPEND); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); our @EXPORT_OK = qw(now msg_more); -use warnings; use 5.010_001; use Scalar::Util qw(blessed); - use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; - -use fields ('sock', # underlying socket - 'rbuf', # scalarref, usually undef - 'wbuf', # arrayref of coderefs or GLOB refs - 'wbuf_off', # offset into first element of wbuf to start writing at - ); - use Errno qw(EAGAIN EINVAL); use Carp qw(confess carp); my $nextq; # queue for next_tick -my $WaitPids; # list of [ pid, callback, callback_arg ] -my $later_queue; # callbacks -my $EXPMAP; # fd -> [ idle_time, $self ] +my $wait_pids; # list of [ pid, callback, callback_arg ] +my $later_queue; # list of callbacks to run at some later interval +my $EXPMAP; # fd -> idle_time our $EXPTIME = 180; # 3 minutes -my ($later_timer, $reap_timer, $exp_timer); +my ($later_timer, $reap_armed, $exp_timer); +my $ToClose; # sockets to close when event loop is done our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (or DSKQXS ref) $_io, # IO::Handle for Epoll - @ToClose, # sockets to close when event loop is done $PostLoopCallback, # subref to call at the end of each loop, if defined (global) @@ -71,12 +68,9 @@ Reset all state =cut sub Reset { %DescriptorMap = (); - $nextq = []; - $WaitPids = []; - $later_queue = []; + $in_loop = $wait_pids = $later_queue = $reap_armed = undef; $EXPMAP = {}; - $reap_timer = $later_timer = $exp_timer = undef; - @ToClose = (); + $nextq = $ToClose = $later_timer = $exp_timer = undef; $LoopTimeout = -1; # no timeout by default @Timers = (); @@ -101,20 +95,18 @@ sub SetLoopTimeout { return $LoopTimeout = $_[1] + 0; } -=head2 C<< CLASS->AddTimer( $seconds, $coderef ) >> +=head2 C<< PublicInbox::DS::add_timer( $seconds, $coderef, $arg) >> Add a timer to occur $seconds from now. $seconds may be fractional, but timers are not guaranteed to fire at the exact time you ask for. -Returns a timer object which you can call C<< $timer->cancel >> on if you need to. - =cut -sub AddTimer { - my ($class, $secs, $coderef) = @_; +sub add_timer ($$;$) { + my ($secs, $coderef, $arg) = @_; my $fire_time = now() + $secs; - my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer"; + my $timer = [$fire_time, $coderef, $arg]; if (!@Timers || $fire_time >= $Timers[-1][0]) { push @Timers, $timer; @@ -182,8 +174,8 @@ sub FirstTimeEventLoop { sub now () { clock_gettime(CLOCK_MONOTONIC) } sub next_tick () { - my $q = $nextq; - $nextq = []; + my $q = $nextq or return; + $nextq = undef; for (@$q) { # we avoid "ref" on blessed refs to workaround a Perl 5.16.3 leak: # https://rt.perl.org/Public/Bug/Display.html?id=114340 @@ -199,18 +191,18 @@ sub next_tick () { sub RunTimers { next_tick(); - return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers; + return (($nextq || $ToClose) ? 0 : $LoopTimeout) unless @Timers; my $now = now(); # Run expired timers while (@Timers && $Timers[0][0] <= $now) { my $to_run = shift(@Timers); - $to_run->[1]->($now) if $to_run->[1]; + $to_run->[1]->($to_run->[2]); } # timers may enqueue into nextq: - return 0 if (@$nextq || @ToClose); + return 0 if ($nextq || $ToClose); return $LoopTimeout unless @Timers; @@ -230,29 +222,54 @@ sub RunTimers { } # We can't use waitpid(-1) safely here since it can hit ``, system(), -# and other things. So we scan the $WaitPids list, which is hopefully -# not too big. +# and other things. So we scan the $wait_pids list, which is hopefully +# not too big. We keep $wait_pids small by not calling dwaitpid() +# until we've hit EOF when reading the stdout of the child. + sub reap_pids { - my $tmp = $WaitPids; - $WaitPids = []; - $reap_timer = undef; - foreach my $ary (@$tmp) { - my ($pid, $cb, $arg) = @$ary; - my $ret = waitpid($pid, WNOHANG); - if ($ret == 0) { - push @$WaitPids, $ary; - } elsif ($cb) { - eval { $cb->($arg, $pid) }; - } - } - if (@$WaitPids) { - # we may not be donea, and we may miss our - $reap_timer = AddTimer(undef, 1, \&reap_pids); - } + $reap_armed = undef; + my $tmp = $wait_pids or return; + $wait_pids = undef; + foreach my $ary (@$tmp) { + my ($pid, $cb, $arg) = @$ary; + my $ret = waitpid($pid, WNOHANG); + if ($ret == 0) { + push @$wait_pids, $ary; # autovivifies @$wait_pids + } elsif ($cb) { + eval { $cb->($arg, $pid) }; + } + } + # we may not be done, yet, and could've missed/masked a SIGCHLD: + $reap_armed //= requeue(\&reap_pids) if $wait_pids; } # reentrant SIGCHLD handler (since reap_pids is not reentrant) -sub enqueue_reap ($) { push @$nextq, \&reap_pids }; +sub enqueue_reap () { $reap_armed //= requeue(\&reap_pids) } + +sub in_loop () { $in_loop } + +# Internal function: run the post-event callback, send read events +# for pushed-back data, and close pending connections. returns 1 +# if event loop should continue, or 0 to shut it all down. +sub PostEventLoop () { + # now we can close sockets that wanted to close during our event + # processing. (we didn't want to close them during the loop, as we + # didn't want fd numbers being reused and confused during the event + # loop) + if (my $close_now = $ToClose) { + $ToClose = undef; # will be autovivified on push + @$close_now = map { fileno($_) } @$close_now; + + # order matters, destroy expiry times, first: + delete @$EXPMAP{@$close_now}; + + # ->DESTROY methods may populate ToClose + delete @DescriptorMap{@$close_now}; + } + + # by default we keep running, unless a postloop callback cancels it + $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1; +} sub EpollEventLoop { local $in_loop = 1; @@ -292,28 +309,6 @@ sub SetPostLoopCallback { $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; } -# Internal function: run the post-event callback, send read events -# for pushed-back data, and close pending connections. returns 1 -# if event loop should continue, or 0 to shut it all down. -sub PostEventLoop { - # now we can close sockets that wanted to close during our event processing. - # (we didn't want to close them during the loop, as we didn't want fd numbers - # being reused and confused during the event loop) - delete($DescriptorMap{fileno($_)}) for @ToClose; - @ToClose = (); # let refcounting drop everything all at once - - # by default we keep running, unless a postloop callback (either per-object - # or global) cancels it - my $keep_running = 1; - - # now we're at the very end, call callback if defined - if (defined $PostLoopCallback) { - $keep_running &&= $PostLoopCallback->(\%DescriptorMap); - } - - return $keep_running; -} - ##################################################################### ### PublicInbox::DS-the-object code ##################################################################### @@ -332,13 +327,12 @@ This is normally (always?) called from your subclass via: =cut sub new { my ($self, $sock, $ev) = @_; - $self = fields::new($self) unless ref $self; - $self->{sock} = $sock; my $fd = fileno($sock); _InitPoller(); +retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { $ev &= ~EPOLLEXCLUSIVE; @@ -357,7 +351,7 @@ sub new { ### I N S T A N C E M E T H O D S ##################################################################### -sub requeue ($) { push @$nextq, $_[0] } +sub requeue ($) { push @$nextq, $_[0] } # autovivifies =head2 C<< $obj->close >> @@ -390,17 +384,19 @@ sub close { # defer closing the actual socket until the event loop is done # processing this round of events. (otherwise we might reuse fds) - push @ToClose, $sock; + push @$ToClose, $sock; # autovivifies $ToClose return 0; } # portable, non-thread-safe sendfile emulation (no pread, yet) -sub psendfile ($$$) { - my ($sock, $fh, $off) = @_; +sub send_tmpio ($$) { + my ($sock, $tmpio) = @_; - seek($fh, $$off, SEEK_SET) or return; - defined(my $to_write = read($fh, my $buf, 16384)) or return; + sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return; + my $n = $tmpio->[2] // 65536; + $n = 65536 if $n > 65536; + defined(my $to_write = sysread($tmpio->[0], my $buf, $n)) or return; my $written = 0; while ($to_write > 0) { if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) { @@ -411,26 +407,26 @@ sub psendfile ($$$) { last; } } - $$off += $written; + $tmpio->[1] += $written; # offset + $tmpio->[2] -= $written if defined($tmpio->[2]); # length $written; } sub epbit ($$) { # (sock, default) - ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1]; + $_[0]->can('stop_SSL') ? PublicInbox::TLS::epollbit() : $_[1]; } # returns 1 if done, 0 if incomplete sub flush_write ($) { my ($self) = @_; + my $sock = $self->{sock} or return; my $wbuf = $self->{wbuf} or return 1; - my $sock = $self->{sock}; next_buf: while (my $bref = $wbuf->[0]) { if (ref($bref) ne 'CODE') { - my $off = delete($self->{wbuf_off}) // 0; while ($sock) { - my $w = psendfile($sock, $bref, \$off); + my $w = send_tmpio($sock, $bref); # bref is tmpio if (defined $w) { if ($w == 0) { shift @$wbuf; @@ -438,13 +434,12 @@ next_buf: } } elsif ($! == EAGAIN) { epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT); - $self->{wbuf_off} = $off; return 0; } else { return $self->close; } } - } else { #($ref eq 'CODE') { + } else { #(ref($bref) eq 'CODE') { shift @$wbuf; my $before = scalar(@$wbuf); $bref->($self); @@ -494,12 +489,12 @@ sub drop { # PerlIO::mmap or PerlIO::scalar if needed sub tmpio ($$$) { my ($self, $bref, $off) = @_; - my $fh = tmpfile('wbuf', $self->{sock}, 1) or + my $fh = tmpfile('wbuf', $self->{sock}, O_APPEND) or return drop($self, "tmpfile $!"); $fh->autoflush(1); my $len = bytes::length($$bref) - $off; $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!"); - $fh + [ $fh, 0 ] # [1] = offset, [2] = length, not set by us } =head2 C<< $obj->write( $data ) >> @@ -528,9 +523,9 @@ sub write { if ($ref eq 'CODE') { push @$wbuf, $bref; } else { - my $last = $wbuf->[-1]; - if (ref($last) eq 'GLOB') { # append to tmp file buffer - $last->print($$bref) or return drop($self, "print: $!"); + my $tmpio = $wbuf->[-1]; + if ($tmpio && !defined($tmpio->[2])) { # append to tmp file buffer + $tmpio->[0]->print($$bref) or return drop($self, "print: $!"); } else { my $tmpio = tmpio($self, $bref, 0) or return 0; push @$wbuf, $tmpio; @@ -559,7 +554,7 @@ sub write { # wbuf may be an empty array if we're being called inside # ->flush_write via CODE bref: - push @{$self->{wbuf} ||= []}, $tmpio; + push @{$self->{wbuf}}, $tmpio; # autovivifies return 0; } } @@ -572,15 +567,14 @@ sub msg_more ($$) { my $wbuf = $self->{wbuf}; if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) && - ref($sock) ne 'IO::Socket::SSL') { + !$sock->can('stop_SSL')) { my $n = send($sock, $_[1], MSG_MORE); if (defined $n) { my $nlen = bytes::length($_[1]) - $n; return 1 if $nlen == 0; # all done! # queue up the unwritten substring: my $tmpio = tmpio($self, \($_[1]), $n) or return 0; - $self->{wbuf} //= $wbuf //= []; - push @$wbuf, $tmpio; + push @{$self->{wbuf}}, $tmpio; # autovivifies epwait($sock, EPOLLOUT|EPOLLONESHOT); return 0; } @@ -603,7 +597,7 @@ sub accept_tls_step ($) { return 1 if $sock->accept_SSL; return $self->close if $! != EAGAIN; epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT); - unshift @{$self->{wbuf} ||= []}, \&accept_tls_step; + unshift(@{$self->{wbuf}}, \&accept_tls_step); # autovivifies 0; } @@ -614,16 +608,16 @@ sub shutdn_tls_step ($) { return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1); return $self->close if $! != EAGAIN; epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT); - unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step; + unshift(@{$self->{wbuf}}, \&shutdn_tls_step); # autovivifies 0; } # don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC -# or fork w/o exec, so no inadvertant socket sharing +# or fork w/o exec, so no inadvertent socket sharing sub shutdn ($) { my ($self) = @_; my $sock = $self->{sock} or return; - if (ref($sock) eq 'IO::Socket::SSL') { + if ($sock->can('stop_SSL')) { shutdn_tls_step($self); } else { $self->close; @@ -632,68 +626,53 @@ sub shutdn ($) { # must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t) sub dwaitpid ($$$) { - my ($pid, $cb, $arg) = @_; - if ($in_loop) { - push @$WaitPids, [ $pid, $cb, $arg ]; + die "Not in EventLoop\n" unless $in_loop; + push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ] - # We could've just missed our SIGCHLD, cover it, here: - requeue(\&reap_pids); - } else { - die "Not in EventLoop\n"; - } + # We could've just missed our SIGCHLD, cover it, here: + enqueue_reap(); } sub _run_later () { - my $run = $later_queue; - $later_timer = undef; - $later_queue = []; - $_->() for @$run; + my $run = $later_queue or return; + $later_timer = $later_queue = undef; + $_->() for @$run; } sub later ($) { - my ($cb) = @_; - push @$later_queue, $cb; - $later_timer //= AddTimer(undef, 60, \&_run_later); + push @$later_queue, $_[0]; # autovivifies @$later_queue + $later_timer //= add_timer(60, \&_run_later); } sub expire_old () { - my $now = now(); - my $exp = $EXPTIME; - my $old = $now - $exp; - my %new; - while (my ($fd, $v) = each %$EXPMAP) { - my ($idle_time, $ds_obj) = @$v; - if ($idle_time < $old) { - if (!$ds_obj->shutdn) { - $new{$fd} = $v; - } - } else { - $new{$fd} = $v; - } - } - $EXPMAP = \%new; - $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef; + my $now = now(); + my $exp = $EXPTIME; + my $old = $now - $exp; + my %new; + while (my ($fd, $idle_at) = each %$EXPMAP) { + if ($idle_at < $old) { + my $ds_obj = $DescriptorMap{$fd}; + $new{$fd} = $idle_at if !$ds_obj->shutdn; + } else { + $new{$fd} = $idle_at; + } + } + $EXPMAP = \%new; + $exp_timer = scalar(keys %new) ? later(\&expire_old) : undef; } sub update_idle_time { - my ($self) = @_; - my $sock = $self->{sock} or return; - $EXPMAP->{fileno($sock)} = [ now(), $self ]; - $exp_timer //= later(\&expire_old); + my ($self) = @_; + my $sock = $self->{sock} or return; + $EXPMAP->{fileno($sock)} = now(); + $exp_timer //= later(\&expire_old); } sub not_idle_long { - my ($self, $now) = @_; - my $sock = $self->{sock} or return; - my $ary = $EXPMAP->{fileno($sock)} or return; - my $exp_at = $ary->[0] + $EXPTIME; - $exp_at > $now; -} - -package PublicInbox::DS::Timer; -# [$abs_float_firetime, $coderef]; -sub cancel { - $_[0][1] = undef; + my ($self, $now) = @_; + my $sock = $self->{sock} or return; + my $idle_at = $EXPMAP->{fileno($sock)} or return; + ($idle_at + $EXPTIME) > $now; } 1;