This will allow us to minimize buffering after we wait
(possibly a long time) for readability. This also greatly
reduces the amount of Danga::Socket-specific knowledge we
have in our PSGI code, making it easier for others to
understand.
if (ref $body eq 'ARRAY') {
$write->($_) foreach @$body;
$close->();
+ } elsif ($body->can('async_pass')) { # HTTPD::Async
+ # prevent us from reading the body faster than we
+ # can write to the client
+ my $restart_read = sub { $body->watch_read(1) };
+ $body->async_pass(sub {
+ local $/ = \8192;
+ my $buf = $body->getline;
+ if (defined $buf) {
+ $write->($buf);
+ if ($self->{write_buf}) {
+ $body->watch_read(0);
+ $self->write($restart_read);
+ }
+ return; # continue waiting
+ }
+ $body->close;
+ $close->();
+ });
} else {
my $pull;
$pull = sub {
$self;
}
+sub async_pass { $_[0]->{cb} = $_[1] }
sub event_read { $_[0]->{cb}->() }
sub event_hup { $_[0]->{cb}->() }
sub event_err { $_[0]->{cb}->() }
sub sysread { shift->{sock}->sysread(@_) }
+sub getline { $_[0]->{sock}->getline };
sub close {
my $self = shift;