return $timeout;
}
-sub event_step ($) {
- my ($self) = @_;
- return if $self->{closed};
-
- my $wbuf = $self->{wbuf};
- if (@$wbuf) {
- $self->event_write;
- return if $self->{closed} || scalar(@$wbuf);
- }
-
- # only read more requests if we've drained the write buffer,
- # otherwise we can be buffering infinitely w/o backpressure
- $self->event_read;
-}
-
### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
### okay.
sub EpollEventLoop {
# get up to 1000 events
my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events);
for ($i=0; $i<$evcount; $i++) {
- my $ev = $events[$i];
-
# it's possible epoll_wait returned many events, including some at the end
# that ones in the front triggered unregister-interest actions. if we
# can't find the %sock entry, it's because we're no longer interested
# in that event.
- event_step($DescriptorMap{$ev->[0]});
+ $DescriptorMap{$events[$i]->[0]}->event_step;
}
return unless PostEventLoop();
}
# Fetch handles with read events
while (@poll) {
my ($fd, $state) = splice(@poll, 0, 2);
- next unless $state;
-
- event_step($DescriptorMap{$fd});
+ $DescriptorMap{$fd}->event_step if $state;
}
return unless PostEventLoop();
}
foreach my $kev (@ret) {
- my ($fd, $filter, $flags, $fflags) = @$kev;
- event_step($DescriptorMap{$fd});
+ $DescriptorMap{$kev->[0]}->event_step;
}
return unless PostEventLoop();
}
$self->watch_write(1);
}
-=head2 (VIRTUAL) C<< $obj->event_read() >>
-
-Readable event handler. Concrete deriviatives of PublicInbox::DS should
-provide an implementation of this. The default implementation is a noop
-if called.
-
-=cut
-sub event_read {} # noop
-
-=head2 C<< $obj->event_write() >>
-
-Writable event handler. Concrete deriviatives of PublicInbox::DS may wish to
-provide an implementation of this. The default implementation calls
-C<write()> with an C<undef>.
-
-=cut
-sub event_write {
- my $self = shift;
- $self->write(undef);
-}
-
=head2 C<< $obj->watch_read( $boolean ) >>
Turn 'readable' event notification on or off.
fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
$self->SUPER::new($w);
- # always writable, since PublicInbox::EvCleanup::event_write
+ # always writable, since PublicInbox::EvCleanup::event_step
# never drains wbuf. We can avoid wasting a hash slot by
# stuffing the read-end of the pipe into the never-to-be-touched
# wbuf
}
# Called by PublicInbox::DS
-sub event_write {
+sub event_step {
my ($self) = @_;
$self->watch_write(0);
_run_asap();
$self;
}
-sub event_read { # called by PublicInbox::DS
+sub event_step { # called by PublicInbox::DS
my ($self) = @_;
- return event_read_input($self) if defined $self->{env};
+ my $wbuf = $self->{wbuf};
+ if (@$wbuf) {
+ $self->write(undef);
+ return if $self->{closed} || scalar(@$wbuf);
+ }
+ # only read more requests if we've drained the write buffer,
+ # otherwise we can be buffering infinitely w/o backpressure
+
+ return read_input($self) if defined $self->{env};
my $off = length($self->{rbuf});
my $r = sysread($self->{sock}, $self->{rbuf}, 8192, $off);
my $len = input_prepare($self, \%env);
defined $len or return write_err($self, undef); # EMFILE/ENFILE
- $len ? event_read_input($self) : app_dispatch($self);
+ $len ? read_input($self) : app_dispatch($self);
}
-sub event_read_input ($) {
+sub read_input ($) {
my ($self) = @_;
my $env = $self->{env};
- return event_read_input_chunked($self) if env_chunked($env);
+ return if $env->{REMOTE_ADDR}; # in app dispatch
+ return read_input_chunked($self) if env_chunked($env);
# env->{CONTENT_LENGTH} (identity)
my $sock = $self->{sock};
sub next_request ($) {
my ($self) = @_;
- $self->watch_write(0);
if ($self->{rbuf} eq '') { # wait for next request
$self->watch_read(1);
} else { # avoid recursion for pipelined requests
$rv
}
-sub event_read_input_chunked { # unlikely...
+sub read_input_chunked { # unlikely...
my ($self) = @_;
my $input = $self->{env}->{'psgi.input'};
my $sock = $self->{sock};
$self->{cb} = main_cb($http, $fh, $bref);
}
-sub event_read { $_[0]->{cb}->(@_) }
+sub event_step { $_[0]->{cb}->(@_) }
sub close {
my $self = shift;
$self
}
-sub event_read {
+sub event_step {
my ($self) = @_;
my $sock = $self->{sock};
} else {
# pipelined request, we bypassed socket-readiness
# checks to get here:
- event_read($nntp);
+ event_step($nntp);
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
do_write($self, $data);
}
-sub event_write {
+sub event_step {
my ($self) = @_;
- update_idle_time($self);
- # only continue watching for readability when we are done writing:
- if ($self->write(undef) == 1 && !$self->{long_res}) {
- $self->watch_read(1);
+ return if $self->{closed};
+
+ my $wbuf = $self->{wbuf};
+ if (@$wbuf) {
+ update_idle_time($self);
+ $self->write(undef);
+ return if $self->{closed} || scalar(@$wbuf);
}
-}
+ return if $self->{long_res};
+ # only read more requests if we've drained the write buffer,
+ # otherwise we can be buffering infinitely w/o backpressure
-sub event_read {
- my ($self) = @_;
use constant LINE_MAX => 512; # RFC 977 section 2.3
my $rbuf = \($self->{rbuf});
my $r;
$self;
}
-sub event_read { $_[0]->{cb}->($_[0]) }
+sub event_step { $_[0]->{cb}->($_[0]) }
1;