use Carp qw(croak confess carp);
use File::Temp qw(tempfile);
-our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 };
-
our (
- $HaveEpoll, # Flag -- is epoll available? initially undefined.
- $HaveKQueue,
%DescriptorMap, # fd (num) -> PublicInbox::DS object
- $Epoll, # Global epoll fd (for epoll mode only)
- $KQueue, # Global kqueue fd ref (for kqueue mode only)
+ $Epoll, # Global epoll fd (or DSKQXS ref)
$_io, # IO::Handle for Epoll
@ToClose, # sockets to close when event loop is done
$PostLoopCallback = undef;
$DoneInit = 0;
- # NOTE kqueue is close-on-fork, and we don't account for it, yet
- # OTOH, we (public-inbox) don't need this sub outside of tests...
- POSIX::close($$KQueue) if !$_io && $KQueue && $$KQueue >= 0;
- $KQueue = undef;
-
- $_io = undef; # close $Epoll
- $Epoll = undef;
+ $_io = undef; # closes real $Epoll FD
+ $Epoll = undef; # may call DSKQXS::DESTROY
*EventLoop = *FirstTimeEventLoop;
}
return if $DoneInit;
$DoneInit = 1;
- if ($HAVE_KQUEUE) {
- $KQueue = IO::KQueue->new();
- $HaveKQueue = defined $KQueue;
- if ($HaveKQueue) {
- *EventLoop = *KQueueEventLoop;
- }
- }
- elsif (PublicInbox::Syscall::epoll_defined()) {
- $Epoll = eval { epoll_create(1024); };
- $HaveEpoll = defined $Epoll && $Epoll >= 0;
- if ($HaveEpoll) {
- set_cloexec($Epoll);
- *EventLoop = *EpollEventLoop;
- }
+ if (!PublicInbox::Syscall::epoll_defined()) {
+ $Epoll = eval {
+ require PublicInbox::DSKQXS;
+ PublicInbox::DSKQXS->import;
+ PublicInbox::DSKQXS->new;
+ };
+ } else {
+ $Epoll = epoll_create();
+ set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0);
}
+ *EventLoop = *EpollEventLoop;
}
=head2 C<< CLASS->EventLoop() >>
_InitPoller();
- if ($HaveEpoll) {
- EpollEventLoop($class);
- } elsif ($HaveKQueue) {
- KQueueEventLoop($class);
- }
+ EventLoop($class);
}
sub now () { clock_gettime(CLOCK_MONOTONIC) }
return $timeout;
}
-### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
-### okay.
sub EpollEventLoop {
- my $class = shift;
-
while (1) {
my @events;
my $i;
}
}
-### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
-### okay.
-sub KQueueEventLoop {
- my $class = shift;
-
- while (1) {
- my $timeout = RunTimers();
- my @ret = eval { $KQueue->kevent($timeout) };
- if (my $err = $@) {
- # workaround https://rt.cpan.org/Ticket/Display.html?id=116615
- if ($err =~ /Interrupted system call/) {
- @ret = ();
- } else {
- die $err;
- }
- }
-
- foreach my $kev (@ret) {
- $DescriptorMap{$kev->[0]}->event_step;
- }
- return unless PostEventLoop();
- }
-}
-
=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
Sets post loop callback function. Pass a subref and it will be
return $keep_running;
}
-# map EPOLL* bits to kqueue EV_* flags for EV_SET
-sub kq_flag ($$) {
- my ($bit, $ev) = @_;
- if ($ev & $bit) {
- my $fl = EV_ADD() | EV_ENABLE();
- ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl;
- } else {
- EV_ADD() | EV_DISABLE();
- }
-}
-
#####################################################################
### PublicInbox::DS-the-object code
#####################################################################
_InitPoller();
- if ($HaveEpoll) {
-retry:
- if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
- if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
- $ev &= ~EPOLLEXCLUSIVE;
- goto retry;
- }
- die "couldn't add epoll watch for $fd: $!\n";
+ if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
+ if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
+ $ev &= ~EPOLLEXCLUSIVE;
+ goto retry;
}
+ die "couldn't add epoll watch for $fd: $!\n";
}
- elsif ($HaveKQueue) {
- $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
- $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
- }
-
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
if $DescriptorMap{$fd};
# if we're using epoll, we have to remove this from our epoll fd so we stop getting
# notifications about it
- if ($HaveEpoll) {
- my $fd = fileno($sock);
- epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and
- confess("EPOLL_CTL_DEL: $!");
- }
+ my $fd = fileno($sock);
+ epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and
+ confess("EPOLL_CTL_DEL: $!");
# we explicitly don't delete from DescriptorMap here until we
# actually close the socket, as we might be in the middle of
sub watch ($$) {
my ($self, $ev) = @_;
my $sock = $self->{sock} or return;
- my $fd = fileno($sock);
- if ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and
- confess("EPOLL_CTL_MOD $!");
- } elsif ($HaveKQueue) {
- $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
- $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
- }
+ epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
+ confess("EPOLL_CTL_MOD $!");
0;
}
--- /dev/null
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# <https://www.gnu.org/licenses/gpl-1.0.txt>
+# <https://dev.perl.org/licenses/artistic.html>
+#
+# kqueue support via IO::KQueue XS module. This makes kqueue look
+# like epoll to simplify the code in DS.pm. This is NOT meant to be
+# an all encompassing emulation of epoll via IO::KQueue, but just to
+# support cases public-inbox-nntpd/httpd care about.
+# A pure-Perl version using syscall() is planned, and it should be
+# faster due to the lack of syscall overhead.
+package PublicInbox::DSKQXS;
+use strict;
+use warnings;
+use parent qw(IO::KQueue);
+use parent qw(Exporter);
+use IO::KQueue;
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+our @EXPORT = qw(epoll_ctl epoll_wait);
+my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec)
+
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+ my ($bit, $ev) = @_;
+ if ($ev & $bit) {
+ my $fl = EV_ADD | EV_ENABLE;
+ ($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl;
+ } else {
+ EV_ADD | EV_DISABLE;
+ }
+}
+
+sub new {
+ my ($class) = @_;
+ die 'non-singleton use not supported' if $owner_pid == $$;
+ $owner_pid = $$;
+ $class->SUPER::new;
+}
+
+sub epoll_ctl {
+ my ($self, $op, $fd, $ev) = @_;
+ if ($op != EPOLL_CTL_DEL) {
+ $self->EV_SET($fd, EVFILT_READ, kq_flag(EPOLLIN, $ev));
+ $self->EV_SET($fd, EVFILT_WRITE, kq_flag(EPOLLOUT, $ev));
+ }
+ 0;
+}
+
+sub epoll_wait {
+ my ($self, $maxevents, $timeout_msec, $events) = @_;
+ @$events = eval { $self->kevent($timeout_msec) };
+ if (my $err = $@) {
+ # workaround https://rt.cpan.org/Ticket/Display.html?id=116615
+ if ($err =~ /Interrupted system call/) {
+ @$events = ();
+ } else {
+ die $err;
+ }
+ }
+ # caller only cares for $events[$i]->[0]
+ scalar(@$events);
+}
+
+sub DESTROY {
+ my ($self) = @_;
+ if ($owner_pid == $$) {
+ POSIX::close($$self);
+ $owner_pid = -1;
+ }
+}
+
+1;