use fields ('sock', # underlying socket
'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
- 'event_watch', # bitmask of events the client is interested in
- # (EPOLLIN,OUT,etc.)
);
use Errno qw(EAGAIN EINVAL);
return $keep_running;
}
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+ my ($bit, $ev) = @_;
+ if ($ev & $bit) {
+ my $fl = EV_ADD() | EV_ENABLE();
+ ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl;
+ } else {
+ EV_DISABLE();
+ }
+}
+
#####################################################################
### PublicInbox::DS-the-object code
#####################################################################
Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
- $self->{event_watch} = $ev;
-
_InitPoller();
if ($HaveEpoll) {
retry:
if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
- $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE);
+ $ev &= ~EPOLLEXCLUSIVE;
goto retry;
}
die "couldn't add epoll watch for $fd: $!\n";
}
}
elsif ($HaveKQueue) {
- my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f);
- $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f);
+ $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev));
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev));
}
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
}
} elsif ($! == EAGAIN) {
$self->{wbuf_off} = $off;
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
} else {
return $self->close;
} # while @$wbuf
delete $self->{wbuf};
- $self->watch_write(0);
1; # all done
}
return $self->close;
}
$self->{wbuf} = [ tmpbuf($bref, $written) ];
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
# queue up the unwritten substring:
$self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
$self->write(\($_[1]));
}
-sub watch_chg ($$$) {
- my ($self, $bits, $set) = @_;
+sub watch ($$) {
+ my ($self, $ev) = @_;
my $sock = $self->{sock} or return;
- my $cur = $self->{event_watch};
- my $changes = $cur;
- if ($set) {
- $changes |= $bits;
- } else {
- $changes &= ~$bits;
- }
- return if $changes == $cur;
my $fd = fileno($sock);
if ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and
+ epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and
confess("EPOLL_CTL_MOD $!");
} elsif ($HaveKQueue) {
- my $flag = $set ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN;
- $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT;
+ $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
}
- $self->{event_watch} = $changes;
}
-=head2 C<< $obj->watch_read( $boolean ) >>
-
-Turn 'readable' event notification on or off.
-
-=cut
-sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) };
-
-=head2 C<< $obj->watch_write( $boolean ) >>
-
-Turn 'writable' event notification on or off.
-
-=cut
-sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) };
+sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
use strict;
use warnings;
use base qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
my $ENABLED;
sub enabled { $ENABLED }
# Called by PublicInbox::DS
sub event_step {
my ($self) = @_;
- $self->watch_write(0);
_run_asap();
}
sub _asap_timer () {
$singleton ||= once_init();
- $singleton->watch_write(1);
+ $singleton->watch(EPOLLOUT|EPOLLONESHOT);
1;
}
use IO::Handle;
require PublicInbox::EvCleanup;
PublicInbox::DS->import(qw(msg_more write_in_full));
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
use constant {
CHUNK_START => -1, # [a-f0-9]+\r\n
CHUNK_END => -2, # \r\n
sub new ($$$) {
my ($class, $sock, $addr, $httpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+ $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
$self->{httpd} = $httpd;
$self->{rbuf} = '';
($self->{remote_addr}, $self->{remote_port}) =
return $self->close if $r == 0;
return rbuf_process($self);
}
- return if $!{EAGAIN}; # no need to call watch_read(1) again
+
+ return $self->watch_in1 if $!{EAGAIN};
# common for clients to break connections without warning,
# would be too noisy to log here:
($r == -2 && length($self->{rbuf}) > 0x4000)) {
return quit($self, 400);
}
- return $self->watch_read(1) if $r < 0; # incomplete
+ return $self->watch_in1 if $r < 0; # incomplete
$self->{rbuf} = substr($self->{rbuf}, $r);
my $len = input_prepare($self, \%env);
sub app_dispatch {
my ($self, $input) = @_;
- $self->watch_read(0);
my $env = $self->{env};
$env->{REMOTE_ADDR} = $self->{remote_addr};
$env->{REMOTE_PORT} = $self->{remote_port};
sub next_request ($) {
my ($self) = @_;
if ($self->{rbuf} eq '') { # wait for next request
- $self->watch_read(1);
+ $self->watch_in1;
} else { # avoid recursion for pipelined requests
push @$pipelineq, $self;
$pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
return $self->close if (defined $r && $r == 0);
if ($!{EAGAIN}) {
$self->{input_left} = $len;
- return;
+ return $self->watch_in1;
}
err($self, "error reading for input: $! ($len bytes remaining)");
quit($self, 500);
$self;
}
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
+
# fires after pending writes are complete:
sub restart_read_cb ($) {
my ($self) = @_;
- sub { $self->watch_read(1) }
+ sub { restart_read($self) }
}
sub main_cb ($$$) {
$fh->write($$bref);
if ($http->{sock}) { # !closed
if ($http->{wbuf}) {
- $self->watch_read(0);
+ $self->watch(0);
$http->write(restart_read_cb($self));
}
- # stay in watch_read, but let other clients
+ # stay in EPOLLIN, but let other clients
# get some work done, too.
return;
}
# fall through to close below...
} elsif (!defined $r) {
- return if $!{EAGAIN} || $!{EINTR};
+ return restart_read($self) if $!{EAGAIN} || $!{EINTR};
}
# Done! Error handling will happen in $fh->close
r225 => '225 Headers follow (multi-line)',
r430 => '430 No article with that message-id',
};
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
# pipelined request, we bypassed socket-readiness
# checks to get here:
event_step($nntp);
-
- # maybe there's more pipelined data, or we'll have
- # to register it for socket-readiness notifications
- if (!$nntp->{long_res} && $nntp->{sock}) {
- check_read($nntp);
- }
}
}
}
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+ $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
$self->{nntpd} = $nntpd;
res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
$self->{rbuf} = '';
# make sure we disable reading during a long response,
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
- $self->watch_read(0);
my $t0 = now();
$self->{long_res} = sub {
my $more = eval { $cb->() };
- if ($@ || !$self->{sock}) {
+ if ($@ || !$self->{sock}) { # something bad happened...
$self->{long_res} = undef;
if ($@) {
my $done = $self->write(\($_[1]));
return 0 unless $self->{sock};
- # Do not watch for readability if we have data in the queue,
- # instead re-enable watching for readability when we can
- $self->watch_read(0) if (!$done || $self->{long_res});
-
$done;
}
my ($self) = @_;
return unless $self->flush_write && $self->{sock};
- return if $self->{long_res};
update_idle_time($self);
# only read more requests if we've drained the write buffer,
my $off = length($$rbuf);
$r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
unless (defined $r) {
- return if $!{EAGAIN};
+ return $self->watch_in1 if $!{EAGAIN};
return $self->close;
}
return $self->close if $r == 0;
my $len = length($$rbuf);
return $self->close if ($len >= LINE_MAX);
update_idle_time($self);
+
+ # maybe there's more pipelined data, or we'll have
+ # to register it for socket-readiness notifications
+ check_read($self) unless ($self->{long_res} || $self->{wbuf});
}
sub check_read {
} else {
# no pipelined requests available, let the kernel know
# to wake us up if there's more
- $self->watch_read(1); # PublicInbox::DS::watch_read
+ $self->watch_in1; # PublicInbox::DS::watch_in1
}
}
@EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait
EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
- EPOLLEXCLUSIVE);
+ EPOLLONESHOT EPOLLEXCLUSIVE);
%EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
- EPOLLEXCLUSIVE)],
+ EPOLLONESHOT EPOLLEXCLUSIVE)],
sendfile => [qw(sendfile)],
);
# use constant EPOLLHUP => 16;
# use constant EPOLLRDBAND => 128;
use constant EPOLLEXCLUSIVE => (1 << 28);
-# use constant EPOLLONESHOT => (1 << 30);
+use constant EPOLLONESHOT => (1 << 30);
# use constant EPOLLET => (1 << 31);
use constant EPOLL_CTL_ADD => 1;
use constant EPOLL_CTL_DEL => 2;