X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FDS.pm;h=6cd527e265403aef4bd4061d6bf381a34c348858;hb=7c83d3e706811095cedab0bf62ac530d7b0f3a5a;hp=d6ef0b8d4753f6907b9176160223989b10381b22;hpb=df5755b40b4ba1d6048042e18d8ea501755b9a02;p=public-inbox.git diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index d6ef0b8d..6cd527e2 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -18,7 +18,7 @@ use strict; use bytes; use POSIX (); use IO::Handle qw(); -use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET); +use Fcntl qw(SEEK_SET :DEFAULT); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); our @EXPORT_OK = qw(now msg_more); @@ -28,14 +28,16 @@ use 5.010_001; use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket + 'rbuf', # scalarref, usually undef 'wbuf', # arrayref of coderefs or GLOB refs 'wbuf_off', # offset into first element of wbuf to start writing at ); -use Errno qw(EAGAIN EINVAL); +use Errno qw(EAGAIN EINVAL EEXIST); use Carp qw(croak confess carp); -use File::Temp qw(tempfile); +require File::Spec; +my $nextq = []; # queue for next_tick our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (or DSKQXS ref) @@ -98,12 +100,6 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t sub AddTimer { my ($class, $secs, $coderef) = @_; - if (!$secs) { - my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer'); - unshift(@Timers, $timer); - return $timer; - } - my $fire_time = now() + $secs; my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer"; @@ -142,15 +138,17 @@ sub _InitPoller return if $DoneInit; $DoneInit = 1; - if (!PublicInbox::Syscall::epoll_defined()) { - $Epoll = eval { - require PublicInbox::DSKQXS; - PublicInbox::DSKQXS->import; - PublicInbox::DSKQXS->new; - }; - } else { + if (PublicInbox::Syscall::epoll_defined()) { $Epoll = epoll_create(); set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0); + } else { + my $cls; + for (qw(DSKQXS DSPoll)) { + $cls = "PublicInbox::$_"; + last if eval "require $cls"; + } + $cls->import(qw(epoll_ctl epoll_wait)); + $Epoll = $cls->new; } *EventLoop = *EpollEventLoop; } @@ -171,9 +169,23 @@ sub FirstTimeEventLoop { sub now () { clock_gettime(CLOCK_MONOTONIC) } +sub next_tick () { + my $q = $nextq; + $nextq = []; + for (@$q) { + if (ref($_) eq 'CODE') { + $_->(); + } else { + $_->event_step; + } + } +} + # runs timers and returns milliseconds for next one, or next event loop sub RunTimers { - return $LoopTimeout unless @Timers; + next_tick(); + + return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers; my $now = now(); @@ -183,6 +195,9 @@ sub RunTimers { $to_run->[1]->($now) if $to_run->[1]; } + # timers may enqueue into nextq: + return 0 if (@$nextq || @ToClose); + return $LoopTimeout unless @Timers; # convert time to an even number of milliseconds, adding 1 @@ -315,6 +330,8 @@ sub new { ### I N S T A N C E M E T H O D S ##################################################################### +sub requeue ($) { push @$nextq, $_[0] } + =head2 C<< $obj->close >> Close the socket. @@ -410,16 +427,27 @@ next_buf: 1; # all done } -sub do_read ($$$$) { +sub rbuf_idle ($$) { + my ($self, $rbuf) = @_; + if ($$rbuf eq '') { # who knows how long till we can read again + delete $self->{rbuf}; + } else { + $self->{rbuf} = $rbuf; + } +} + +sub do_read ($$$;$) { my ($self, $rbuf, $len, $off) = @_; - my $r = sysread($self->{sock}, $$rbuf, $len, $off); + my $r = sysread($self->{sock}, $$rbuf, $len, $off // 0); return ($r == 0 ? $self->close : $r) if defined $r; # common for clients to break connections without warning, # would be too noisy to log here: if (ref($self) eq 'IO::Socket::SSL') { my $ev = PublicInbox::TLS::epollbit() or return $self->close; + rbuf_idle($self, $rbuf); watch($self, $ev | EPOLLONESHOT); } elsif ($! == EAGAIN) { + rbuf_idle($self, $rbuf); watch($self, EPOLLIN | EPOLLONESHOT); } else { $self->close; @@ -438,12 +466,16 @@ sub drop { # PerlIO::mmap or PerlIO::scalar if needed sub tmpio ($$$) { my ($self, $bref, $off) = @_; - # open(my $fh, '+>>', undef) doesn't set O_APPEND - my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) }; - $fh or return drop($self, "tempfile: $@"); - open($fh, '+>>', $path) or return drop($self, "open: $!"); + my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND + do { + my $fn = File::Spec->tmpdir . '/wbuf-' . rand; + if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely + unlink($fn) or return drop($self, "unlink($fn) $!"); + } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM + return drop($self, "open: $!"); + } + } until (defined $fh); $fh->autoflush(1); - unlink($path) or return drop($self, "unlink: $!"); my $len = bytes::length($$bref) - $off; $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!"); $fh @@ -499,7 +531,10 @@ sub write { return $self->close; } my $tmpio = tmpio($self, $bref, $written) or return 0; - $self->{wbuf} = [ $tmpio ]; + + # wbuf may be an empty array if we're being called inside + # ->flush_write via CODE bref: + push @{$self->{wbuf} ||= []}, $tmpio; watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } @@ -534,8 +569,6 @@ sub watch ($$) { 0; } -sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) } - # return true if complete, false if incomplete (or failure) sub accept_tls_step ($) { my ($self) = @_; @@ -572,7 +605,6 @@ sub shutdn ($) { $self->close; } } - package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; sub cancel {