use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
+ 'rbuf', # scalarref, usually undef
'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
);
use Carp qw(croak confess carp);
require File::Spec;
+my $nextq = []; # queue for next_tick
our (
%DescriptorMap, # fd (num) -> PublicInbox::DS object
$Epoll, # Global epoll fd (or DSKQXS ref)
sub AddTimer {
my ($class, $secs, $coderef) = @_;
- if (!$secs) {
- my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
- unshift(@Timers, $timer);
- return $timer;
- }
-
my $fire_time = now() + $secs;
my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
$cls = "PublicInbox::$_";
last if eval "require $cls";
}
- $cls->import;
+ $cls->import(qw(epoll_ctl epoll_wait));
$Epoll = $cls->new;
}
*EventLoop = *EpollEventLoop;
sub now () { clock_gettime(CLOCK_MONOTONIC) }
+sub next_tick () {
+ my $q = $nextq;
+ $nextq = [];
+ for (@$q) {
+ if (ref($_) eq 'CODE') {
+ $_->();
+ } else {
+ $_->event_step;
+ }
+ }
+}
+
# runs timers and returns milliseconds for next one, or next event loop
sub RunTimers {
- return $LoopTimeout unless @Timers;
+ next_tick();
+
+ return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
my $now = now();
$to_run->[1]->($now) if $to_run->[1];
}
+ # timers may enqueue into nextq:
+ return 0 if (@$nextq || @ToClose);
+
return $LoopTimeout unless @Timers;
# convert time to an even number of milliseconds, adding 1
# now we can close sockets that wanted to close during our event processing.
# (we didn't want to close them during the loop, as we didn't want fd numbers
# being reused and confused during the event loop)
- while (my $sock = shift @ToClose) {
- my $fd = fileno($sock);
-
- # close the socket. (not a PublicInbox::DS close)
- CORE::close($sock);
-
- # and now we can finally remove the fd from the map. see
- # comment above in ->close.
- delete $DescriptorMap{$fd};
- }
-
+ delete($DescriptorMap{fileno($_)}) for @ToClose;
+ @ToClose = (); # let refcounting drop everything all at once
# by default we keep running, unless a postloop callback (either per-object
# or global) cancels it
### I N S T A N C E M E T H O D S
#####################################################################
+sub requeue ($) { push @$nextq, $_[0] }
+
=head2 C<< $obj->close >>
Close the socket.
$written;
}
+sub epbit ($$) { # (sock, default)
+ ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1];
+}
+
# returns 1 if done, 0 if incomplete
sub flush_write ($) {
my ($self) = @_;
goto next_buf;
}
} elsif ($! == EAGAIN) {
+ epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
$self->{wbuf_off} = $off;
- watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
} else {
return $self->close;
1; # all done
}
-sub do_read ($$$$) {
+sub rbuf_idle ($$) {
+ my ($self, $rbuf) = @_;
+ if ($$rbuf eq '') { # who knows how long till we can read again
+ delete $self->{rbuf};
+ } else {
+ $self->{rbuf} = $rbuf;
+ }
+}
+
+sub do_read ($$$;$) {
my ($self, $rbuf, $len, $off) = @_;
- my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+ my $r = sysread(my $sock = $self->{sock}, $$rbuf, $len, $off // 0);
return ($r == 0 ? $self->close : $r) if defined $r;
# common for clients to break connections without warning,
# would be too noisy to log here:
- if (ref($self) eq 'IO::Socket::SSL') {
- my $ev = PublicInbox::TLS::epollbit() or return $self->close;
- watch($self, $ev | EPOLLONESHOT);
- } elsif ($! == EAGAIN) {
- watch($self, EPOLLIN | EPOLLONESHOT);
+ if ($! == EAGAIN) {
+ epwait($sock, epbit($sock, EPOLLIN) | EPOLLONESHOT);
+ rbuf_idle($self, $rbuf);
+ 0;
} else {
$self->close;
}
if (defined $written) {
return 1 if $written == $to_write;
+ requeue($self); # runs: event_step -> flush_write
} elsif ($! == EAGAIN) {
+ epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
$written = 0;
} else {
return $self->close;
}
+
+ # deal with EAGAIN or partial write:
my $tmpio = tmpio($self, $bref, $written) or return 0;
# wbuf may be an empty array if we're being called inside
# ->flush_write via CODE bref:
push @{$self->{wbuf} ||= []}, $tmpio;
- watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
# queue up the unwritten substring:
my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
$self->{wbuf} = [ $tmpio ];
- watch($self, EPOLLOUT|EPOLLONESHOT);
+ epwait($sock, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
- $self->write(\($_[1]));
+
+ # don't redispatch into NNTPdeflate::write
+ PublicInbox::DS::write($self, \($_[1]));
}
-sub watch ($$) {
- my ($self, $ev) = @_;
- my $sock = $self->{sock} or return;
+sub epwait ($$) {
+ my ($sock, $ev) = @_;
epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
confess("EPOLL_CTL_MOD $!");
- 0;
}
-sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
-
# return true if complete, false if incomplete (or failure)
sub accept_tls_step ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
return 1 if $sock->accept_SSL;
return $self->close if $! != EAGAIN;
- if (my $ev = PublicInbox::TLS::epollbit()) {
- unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
- return watch($self, $ev | EPOLLONESHOT);
- }
- drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+ epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+ unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+ 0;
}
+# return true if complete, false if incomplete (or failure)
sub shutdn_tls_step ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
return $self->close if $! != EAGAIN;
- if (my $ev = PublicInbox::TLS::epollbit()) {
- unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
- return watch($self, $ev | EPOLLONESHOT);
- }
- drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+ epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+ unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+ 0;
}
# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
$self->close;
}
}
-
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
sub cancel {