use fields ('sock', # underlying socket
'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write
'wbuf_off', # offset into first element of wbuf to start writing at
- 'closed', # bool: socket is closed
'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
);
return $timeout;
}
-# Placeholder callback when we hit POLLERR/POLLHUP or other unrecoverable
-# errors. Shouldn't be needed in the future.
-sub event_end ($) {
- my ($self) = @_;
- return if $self->{closed};
- $self->{wbuf} = [];
- $self->{wbuf_off} = 0;
-
- # we're screwed if a read handler can't handle POLLERR/POLLHUP-type errors
- $self->event_read;
-}
-
### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
### okay.
sub EpollEventLoop {
# get up to 1000 events
my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events);
for ($i=0; $i<$evcount; $i++) {
- my $ev = $events[$i];
-
# it's possible epoll_wait returned many events, including some at the end
# that ones in the front triggered unregister-interest actions. if we
# can't find the %sock entry, it's because we're no longer interested
# in that event.
- my PublicInbox::DS $pob = $DescriptorMap{$ev->[0]};
- my $code;
- my $state = $ev->[1];
-
- DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n",
- $ev->[0], ref($pob), $ev->[1], time);
-
- # standard non-profiling codepat
- $pob->event_read if $state & EPOLLIN && ! $pob->{closed};
- $pob->event_write if $state & EPOLLOUT && ! $pob->{closed};
- if ($state & (EPOLLERR|EPOLLHUP) && ! $pob->{closed}) {
- event_end($pob);
- }
+ $DescriptorMap{$events[$i]->[0]}->event_step;
}
return unless PostEventLoop();
}
# Fetch handles with read events
while (@poll) {
my ($fd, $state) = splice(@poll, 0, 2);
- next unless $state;
-
- $pob = $DescriptorMap{$fd};
-
- $pob->event_read if $state & POLLIN && ! $pob->{closed};
- $pob->event_write if $state & POLLOUT && ! $pob->{closed};
- event_end($pob) if $state & (POLLERR|POLLHUP) && ! $pob->{closed};
+ $DescriptorMap{$fd}->event_step if $state;
}
return unless PostEventLoop();
}
foreach my $kev (@ret) {
- my ($fd, $filter, $flags, $fflags) = @$kev;
- my PublicInbox::DS $pob = $DescriptorMap{$fd};
-
- DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n",
- $fd, ref($pob), $flags, time);
-
- $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed};
- $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed};
- if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) {
- event_end($pob);
- }
+ $DescriptorMap{$kev->[0]}->event_step;
}
return unless PostEventLoop();
}
$sock->close;
# and now we can finally remove the fd from the map. see
- # comment above in _cleanup.
+ # comment above in ->close.
delete $DescriptorMap{$fd};
}
$self->{wbuf} = [];
$self->{wbuf_off} = 0;
- $self->{closed} = 0;
my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
=cut
sub close {
- my PublicInbox::DS $self = $_[0];
- return if $self->{closed};
-
- # this does most of the work of closing us
- $self->_cleanup();
-
- # defer closing the actual socket until the event loop is done
- # processing this round of events. (otherwise we might reuse fds)
- if (my $sock = delete $self->{sock}) {
- push @ToClose, $sock;
- }
-
- return 0;
-}
-
-### METHOD: _cleanup()
-### Called by our closers so we can clean internal data structures.
-sub _cleanup {
- my PublicInbox::DS $self = $_[0];
-
- # we're effectively closed; we have no fd and sock when we leave here
- $self->{closed} = 1;
+ my ($self) = @_;
+ my $sock = delete $self->{sock} or return;
# we need to flush our write buffer, as there may
# be self-referential closures (sub { $client->close })
# if we're using epoll, we have to remove this from our epoll fd so we stop getting
# notifications about it
- if ($HaveEpoll && $self->{sock}) {
- my $fd = fileno($self->{sock});
+ if ($HaveEpoll) {
+ my $fd = fileno($sock);
epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and
confess("EPOLL_CTL_DEL: $!");
}
# processing an epoll_wait/etc that returned hundreds of fds, one
# of which is not yet processed and is what we're closing. if we
# keep it in DescriptorMap, then the event harnesses can just
- # looked at $pob->{closed} and ignore it. but if it's an
+ # looked at $pob->{sock} == undef and ignore it. but if it's an
# un-accounted for fd, then it (understandably) freak out a bit
# and emit warnings, thinking their state got off.
+
+ # defer closing the actual socket until the event loop is done
+ # processing this round of events. (otherwise we might reuse fds)
+ push @ToClose, $sock;
+
+ return 0;
}
=head2 C<< $obj->sock() >>
# now-dead object does its second write. that is this case. we
# just lie and say it worked. it'll be dead soon and won't be
# hurt by this lie.
- return 1 if $self->{closed};
+ return 1 unless $self->{sock};
my $bref;
$self->watch_write(1);
}
-=head2 (VIRTUAL) C<< $obj->event_read() >>
-
-Readable event handler. Concrete deriviatives of PublicInbox::DS should
-provide an implementation of this. The default implementation will die if
-called.
-
-=cut
-sub event_read { die "Base class event_read called for $_[0]\n"; }
-
-=head2 C<< $obj->event_write() >>
-
-Writable event handler. Concrete deriviatives of PublicInbox::DS may wish to
-provide an implementation of this. The default implementation calls
-C<write()> with an C<undef>.
-
-=cut
-sub event_write {
- my $self = shift;
- $self->write(undef);
-}
-
=head2 C<< $obj->watch_read( $boolean ) >>
Turn 'readable' event notification on or off.
=cut
sub watch_read {
my PublicInbox::DS $self = shift;
- return if $self->{closed} || !$self->{sock};
+ my $sock = $self->{sock} or return;
my $val = shift;
my $event = $self->{event_watch};
$event &= ~POLLIN if ! $val;
$event |= POLLIN if $val;
- my $fd = fileno($self->{sock});
+ my $fd = fileno($sock);
# If it changed, set it
if ($event != $self->{event_watch}) {
if ($HaveKQueue) {
=cut
sub watch_write {
my PublicInbox::DS $self = shift;
- return if $self->{closed} || !$self->{sock};
+ my $sock = $self->{sock} or return;
my $val = shift;
my $event = $self->{event_watch};
$event &= ~POLLOUT if ! $val;
$event |= POLLOUT if $val;
- my $fd = fileno($self->{sock});
+ my $fd = fileno($sock);
# If it changed, set it
if ($event != $self->{event_watch}) {
my PublicInbox::DS $self = shift;
my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') .
($self->{event_watch} & POLLOUT ? 'W' : '') . ")";
- my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open");
+ my $ret = ref($self) . "$rw: " . ($self->{sock} ? 'open' : 'closed');
return $ret;
}