}
} else { #($ref eq 'CODE') {
shift @$wbuf;
- $bref->();
+ $bref->($self);
}
} # while @$wbuf
}
return 0;
} elsif ($ref eq 'CODE') {
- $bref->();
+ $bref->($self);
return 1;
} else {
my $to_write = bytes::length($$bref);
sub {
my $env = delete $self->{env};
$self->write(\"0\r\n\r\n") if $alive == 2;
- $self->write(sub{$alive ? next_request($self) : $self->close});
+ $self->write($alive ? \&next_request : \&close);
}
}
($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
}
+# fires after pending writes are complete:
+sub restart_pass ($) {
+ $_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async
+}
+
+sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) }
+
1;
sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
-# fires after pending writes are complete:
-sub restart_read_cb ($) {
- my ($self) = @_;
- sub { restart_read($self) }
-}
-
sub main_cb ($$$) {
my ($http, $fh, $bref) = @_;
sub {
my ($self) = @_;
my $r = sysread($self->{sock}, $$bref, 8192);
if ($r) {
- $fh->write($$bref);
+ $fh->write($$bref); # may call $http->close
+
if ($http->{sock}) { # !closed
if ($http->{wbuf}) {
+ # HTTP client could not keep up, so
+ # stop reading and buffering.
$self->watch(0);
- $http->write(restart_read_cb($self));
+
+ # Tell the HTTP socket to restart us
+ # when HTTP client is done draining
+ # $http->{wbuf}:
+ $http->enqueue_restart_pass;
}
# stay in EPOLLIN, but let other clients
# get some work done, too.