use fields ('sock', # underlying socket
'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write
'wbuf_off', # offset into first element of wbuf to start writing at
- 'closed', # bool: socket is closed
'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
);
# get up to 1000 events
my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events);
for ($i=0; $i<$evcount; $i++) {
- my $ev = $events[$i];
-
# it's possible epoll_wait returned many events, including some at the end
# that ones in the front triggered unregister-interest actions. if we
# can't find the %sock entry, it's because we're no longer interested
# in that event.
- my PublicInbox::DS $pob = $DescriptorMap{$ev->[0]};
- my $code;
- my $state = $ev->[1];
-
- DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n",
- $ev->[0], ref($pob), $ev->[1], time);
-
- # standard non-profiling codepat
- $pob->event_read if $state & EPOLLIN && ! $pob->{closed};
- $pob->event_write if $state & EPOLLOUT && ! $pob->{closed};
- if ($state & (EPOLLERR|EPOLLHUP)) {
- $pob->event_err if $state & EPOLLERR && ! $pob->{closed};
- $pob->event_hup if $state & EPOLLHUP && ! $pob->{closed};
- }
+ $DescriptorMap{$events[$i]->[0]}->event_step;
}
return unless PostEventLoop();
}
# Fetch handles with read events
while (@poll) {
my ($fd, $state) = splice(@poll, 0, 2);
- next unless $state;
-
- $pob = $DescriptorMap{$fd};
-
- $pob->event_read if $state & POLLIN && ! $pob->{closed};
- $pob->event_write if $state & POLLOUT && ! $pob->{closed};
- $pob->event_err if $state & POLLERR && ! $pob->{closed};
- $pob->event_hup if $state & POLLHUP && ! $pob->{closed};
+ $DescriptorMap{$fd}->event_step if $state;
}
return unless PostEventLoop();
}
foreach my $kev (@ret) {
- my ($fd, $filter, $flags, $fflags) = @$kev;
- my PublicInbox::DS $pob = $DescriptorMap{$fd};
-
- DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n",
- $fd, ref($pob), $flags, time);
-
- $pob->event_read if $filter == IO::KQueue::EVFILT_READ() && !$pob->{closed};
- $pob->event_write if $filter == IO::KQueue::EVFILT_WRITE() && !$pob->{closed};
- if ($flags == IO::KQueue::EV_EOF() && !$pob->{closed}) {
- if ($fflags) {
- $pob->event_err;
- } else {
- $pob->event_hup;
- }
- }
+ $DescriptorMap{$kev->[0]}->event_step;
}
return unless PostEventLoop();
}
$sock->close;
# and now we can finally remove the fd from the map. see
- # comment above in _cleanup.
+ # comment above in ->close.
delete $DescriptorMap{$fd};
}
$self->{wbuf} = [];
$self->{wbuf_off} = 0;
- $self->{closed} = 0;
my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
### I N S T A N C E M E T H O D S
#####################################################################
-=head2 C<< $obj->steal_socket() >>
-
-Basically returns our socket and makes it so that we don't try to close it,
-but we do remove it from epoll handlers. THIS CLOSES $self. It is the same
-thing as calling close, except it gives you the socket to use.
-
-=cut
-sub steal_socket {
- my PublicInbox::DS $self = $_[0];
- return if $self->{closed};
-
- # cleanup does most of the work of closing this socket
- $self->_cleanup();
-
- # now undef our internal sock and fd structures so we don't use them
- my $sock = $self->{sock};
- $self->{sock} = undef;
- return $sock;
-}
-
=head2 C<< $obj->close >>
Close the socket.
=cut
sub close {
- my PublicInbox::DS $self = $_[0];
- return if $self->{closed};
-
- # this does most of the work of closing us
- $self->_cleanup();
-
- # defer closing the actual socket until the event loop is done
- # processing this round of events. (otherwise we might reuse fds)
- if (my $sock = delete $self->{sock}) {
- push @ToClose, $sock;
- }
-
- return 0;
-}
-
-### METHOD: _cleanup()
-### Called by our closers so we can clean internal data structures.
-sub _cleanup {
- my PublicInbox::DS $self = $_[0];
-
- # we're effectively closed; we have no fd and sock when we leave here
- $self->{closed} = 1;
+ my ($self) = @_;
+ my $sock = delete $self->{sock} or return;
# we need to flush our write buffer, as there may
# be self-referential closures (sub { $client->close })
# if we're using epoll, we have to remove this from our epoll fd so we stop getting
# notifications about it
- if ($HaveEpoll && $self->{sock}) {
- my $fd = fileno($self->{sock});
+ if ($HaveEpoll) {
+ my $fd = fileno($sock);
epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and
confess("EPOLL_CTL_DEL: $!");
}
# processing an epoll_wait/etc that returned hundreds of fds, one
# of which is not yet processed and is what we're closing. if we
# keep it in DescriptorMap, then the event harnesses can just
- # looked at $pob->{closed} and ignore it. but if it's an
+ # looked at $pob->{sock} == undef and ignore it. but if it's an
# un-accounted for fd, then it (understandably) freak out a bit
# and emit warnings, thinking their state got off.
+
+ # defer closing the actual socket until the event loop is done
+ # processing this round of events. (otherwise we might reuse fds)
+ push @ToClose, $sock;
+
+ return 0;
}
=head2 C<< $obj->sock() >>
# now-dead object does its second write. that is this case. we
# just lie and say it worked. it'll be dead soon and won't be
# hurt by this lie.
- return 1 if $self->{closed};
+ return 1 unless $self->{sock};
my $bref;
$self->watch_write(1);
}
-=head2 C<< $obj->read( $bytecount ) >>
-
-Read at most I<bytecount> bytes from the underlying handle; returns scalar
-ref on read, or undef on connection closed.
-
-=cut
-sub read {
- my PublicInbox::DS $self = shift;
- return if $self->{closed};
- my $bytes = shift;
- my $buf;
- my $sock = $self->{sock};
-
- # if this is too high, perl quits(!!). reports on mailing lists
- # don't seem to point to a universal answer. 5MB worked for some,
- # crashed for others. 1MB works for more people. let's go with 1MB
- # for now. :/
- my $req_bytes = $bytes > 1048576 ? 1048576 : $bytes;
-
- my $res = sysread($sock, $buf, $req_bytes, 0);
- DebugLevel >= 2 && $self->debugmsg("sysread = %d; \$! = %d", $res, $!);
-
- if (! $res && $! != EAGAIN) {
- # catches 0=conn closed or undef=error
- return undef;
- }
-
- return \$buf;
-}
-
-=head2 (VIRTUAL) C<< $obj->event_read() >>
-
-Readable event handler. Concrete deriviatives of PublicInbox::DS should
-provide an implementation of this. The default implementation will die if
-called.
-
-=cut
-sub event_read { die "Base class event_read called for $_[0]\n"; }
-
-=head2 (VIRTUAL) C<< $obj->event_err() >>
-
-Error event handler. Concrete deriviatives of PublicInbox::DS should
-provide an implementation of this. The default implementation will die if
-called.
-
-=cut
-sub event_err { die "Base class event_err called for $_[0]\n"; }
-
-=head2 (VIRTUAL) C<< $obj->event_hup() >>
-
-'Hangup' event handler. Concrete deriviatives of PublicInbox::DS should
-provide an implementation of this. The default implementation will die if
-called.
-
-=cut
-sub event_hup { die "Base class event_hup called for $_[0]\n"; }
-
-=head2 C<< $obj->event_write() >>
-
-Writable event handler. Concrete deriviatives of PublicInbox::DS may wish to
-provide an implementation of this. The default implementation calls
-C<write()> with an C<undef>.
-
-=cut
-sub event_write {
- my $self = shift;
- $self->write(undef);
-}
-
=head2 C<< $obj->watch_read( $boolean ) >>
Turn 'readable' event notification on or off.
=cut
sub watch_read {
my PublicInbox::DS $self = shift;
- return if $self->{closed} || !$self->{sock};
+ my $sock = $self->{sock} or return;
my $val = shift;
my $event = $self->{event_watch};
$event &= ~POLLIN if ! $val;
$event |= POLLIN if $val;
- my $fd = fileno($self->{sock});
+ my $fd = fileno($sock);
# If it changed, set it
if ($event != $self->{event_watch}) {
if ($HaveKQueue) {
=cut
sub watch_write {
my PublicInbox::DS $self = shift;
- return if $self->{closed} || !$self->{sock};
+ my $sock = $self->{sock} or return;
my $val = shift;
my $event = $self->{event_watch};
$event &= ~POLLOUT if ! $val;
$event |= POLLOUT if $val;
- my $fd = fileno($self->{sock});
+ my $fd = fileno($sock);
# If it changed, set it
if ($event != $self->{event_watch}) {
my PublicInbox::DS $self = shift;
my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') .
($self->{event_watch} & POLLOUT ? 'W' : '') . ")";
- my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open");
+ my $ret = ref($self) . "$rw: " . ($self->{sock} ? 'open' : 'closed');
return $ret;
}