use bytes;
use POSIX ();
use IO::Handle qw();
-use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
+use Fcntl qw(SEEK_SET :DEFAULT);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use parent qw(Exporter);
our @EXPORT_OK = qw(now msg_more);
use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
+ 'rbuf', # scalarref, usually undef
'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
);
-use Errno qw(EAGAIN EINVAL);
+use Errno qw(EAGAIN EINVAL EEXIST);
use Carp qw(croak confess carp);
-use File::Temp qw(tempfile);
+require File::Spec;
+my $nextq = []; # queue for next_tick
our (
%DescriptorMap, # fd (num) -> PublicInbox::DS object
$Epoll, # Global epoll fd (or DSKQXS ref)
sub AddTimer {
my ($class, $secs, $coderef) = @_;
- if (!$secs) {
- my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
- unshift(@Timers, $timer);
- return $timer;
- }
-
my $fire_time = now() + $secs;
my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
return if $DoneInit;
$DoneInit = 1;
- if (!PublicInbox::Syscall::epoll_defined()) {
- $Epoll = eval {
- require PublicInbox::DSKQXS;
- PublicInbox::DSKQXS->import;
- PublicInbox::DSKQXS->new;
- };
- } else {
+ if (PublicInbox::Syscall::epoll_defined()) {
$Epoll = epoll_create();
set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0);
+ } else {
+ my $cls;
+ for (qw(DSKQXS DSPoll)) {
+ $cls = "PublicInbox::$_";
+ last if eval "require $cls";
+ }
+ $cls->import(qw(epoll_ctl epoll_wait));
+ $Epoll = $cls->new;
}
*EventLoop = *EpollEventLoop;
}
sub now () { clock_gettime(CLOCK_MONOTONIC) }
+sub next_tick () {
+ my $q = $nextq;
+ $nextq = [];
+ for (@$q) {
+ if (ref($_) eq 'CODE') {
+ $_->();
+ } else {
+ $_->event_step;
+ }
+ }
+}
+
# runs timers and returns milliseconds for next one, or next event loop
sub RunTimers {
- return $LoopTimeout unless @Timers;
+ next_tick();
+
+ return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
my $now = now();
$to_run->[1]->($now) if $to_run->[1];
}
+ # timers may enqueue into nextq:
+ return 0 if (@$nextq || @ToClose);
+
return $LoopTimeout unless @Timers;
# convert time to an even number of milliseconds, adding 1
### I N S T A N C E M E T H O D S
#####################################################################
+sub requeue ($) { push @$nextq, $_[0] }
+
=head2 C<< $obj->close >>
Close the socket.
1; # all done
}
-sub do_read ($$$$) {
+sub rbuf_idle ($$) {
+ my ($self, $rbuf) = @_;
+ if ($$rbuf eq '') { # who knows how long till we can read again
+ delete $self->{rbuf};
+ } else {
+ $self->{rbuf} = $rbuf;
+ }
+}
+
+sub do_read ($$$;$) {
my ($self, $rbuf, $len, $off) = @_;
- my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+ my $r = sysread($self->{sock}, $$rbuf, $len, $off // 0);
return ($r == 0 ? $self->close : $r) if defined $r;
# common for clients to break connections without warning,
# would be too noisy to log here:
if (ref($self) eq 'IO::Socket::SSL') {
my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+ rbuf_idle($self, $rbuf);
watch($self, $ev | EPOLLONESHOT);
} elsif ($! == EAGAIN) {
+ rbuf_idle($self, $rbuf);
watch($self, EPOLLIN | EPOLLONESHOT);
} else {
$self->close;
# PerlIO::mmap or PerlIO::scalar if needed
sub tmpio ($$$) {
my ($self, $bref, $off) = @_;
- # open(my $fh, '+>>', undef) doesn't set O_APPEND
- my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) };
- $fh or return drop($self, "tempfile: $@");
- open($fh, '+>>', $path) or return drop($self, "open: $!");
+ my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND
+ do {
+ my $fn = File::Spec->tmpdir . '/wbuf-' . rand;
+ if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely
+ unlink($fn) or return drop($self, "unlink($fn) $!");
+ } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM
+ return drop($self, "open: $!");
+ }
+ } until (defined $fh);
$fh->autoflush(1);
- unlink($path) or return drop($self, "unlink: $!");
my $len = bytes::length($$bref) - $off;
$fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
$fh
return $self->close;
}
my $tmpio = tmpio($self, $bref, $written) or return 0;
- $self->{wbuf} = [ $tmpio ];
+
+ # wbuf may be an empty array if we're being called inside
+ # ->flush_write via CODE bref:
+ push @{$self->{wbuf} ||= []}, $tmpio;
watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
0;
}
-sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
-
# return true if complete, false if incomplete (or failure)
sub accept_tls_step ($) {
my ($self) = @_;
$self->close;
}
}
-
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
sub cancel {