use fields ('sock', # underlying socket
'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
- 'event_watch', # bitmask of events the client is interested in
- # (EPOLLIN,OUT,etc.)
);
use Errno qw(EAGAIN EINVAL);
return $keep_running;
}
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+ my ($bit, $ev) = @_;
+ if ($ev & $bit) {
+ my $fl = EV_ADD() | EV_ENABLE();
+ ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl;
+ } else {
+ EV_DISABLE();
+ }
+}
+
#####################################################################
### PublicInbox::DS-the-object code
#####################################################################
Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
- $self->{event_watch} = $ev;
-
_InitPoller();
if ($HaveEpoll) {
retry:
if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
- $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE);
+ $ev &= ~EPOLLEXCLUSIVE;
goto retry;
}
die "couldn't add epoll watch for $fd: $!\n";
}
}
elsif ($HaveKQueue) {
- my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f);
- $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f);
+ $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev));
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev));
}
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
}
} elsif ($! == EAGAIN) {
$self->{wbuf_off} = $off;
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
} else {
return $self->close;
} # while @$wbuf
delete $self->{wbuf};
- $self->watch_write(0);
1; # all done
}
return $self->close;
}
$self->{wbuf} = [ tmpbuf($bref, $written) ];
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
# queue up the unwritten substring:
$self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
$self->write(\($_[1]));
}
-sub watch_chg ($$$) {
- my ($self, $bits, $set) = @_;
+sub watch ($$) {
+ my ($self, $ev) = @_;
my $sock = $self->{sock} or return;
- my $cur = $self->{event_watch};
- my $changes = $cur;
- if ($set) {
- $changes |= $bits;
- } else {
- $changes &= ~$bits;
- }
- return if $changes == $cur;
my $fd = fileno($sock);
if ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and
+ epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and
confess("EPOLL_CTL_MOD $!");
} elsif ($HaveKQueue) {
- my $flag = $set ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN;
- $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT;
+ $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
}
- $self->{event_watch} = $changes;
}
-=head2 C<< $obj->watch_read( $boolean ) >>
-
-Turn 'readable' event notification on or off.
-
-=cut
-sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) };
-
-=head2 C<< $obj->watch_write( $boolean ) >>
-
-Turn 'writable' event notification on or off.
-
-=cut
-sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) };
+sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];