- # only continue watching for readability when we are done writing:
- return if $self->write(undef) != 1;
-
- if ($self->{rbuf} eq '') { # wait for next request
- $self->watch_read(1);
- } else { # avoid recursion for pipelined requests
- push @$pipelineq, $self;
- $next_tick ||= Danga::Socket->AddTimer(0, *process_pipelineq);
+ my $forward = $self->{forward};
+
+ # limit our own running time for fairness with other
+ # clients and to avoid buffering too much:
+ my $buf = eval {
+ local $/ = \65536;
+ $forward->getline;
+ } if $forward;
+
+ if (defined $buf) {
+ # may close in PublicInbox::DS::write
+ if ($self->{alive} == 2) {
+ chunked_write($self, $buf);
+ } else {
+ identity_write($self, $buf);
+ }
+
+ if ($self->{sock}) {
+ # autovivify wbuf
+ my $new_size = push(@{$self->{wbuf}}, \&getline_pull);
+
+ # wbuf may be populated by {chunked,identity}_write()
+ # above, no need to rearm if so:
+ $self->requeue if $new_size == 1;
+ return; # likely
+ }
+ } elsif ($@) {
+ warn "response ->getline error: $@";
+ $self->close;
+ }
+ # avoid recursion
+ if (delete $self->{forward}) {
+ eval { $forward->close };
+ if ($@) {
+ warn "response ->close error: $@";
+ $self->close; # idempotent
+ }
+ }
+ response_done($self, delete $self->{alive});
+}
+
+sub response_write {
+ my ($self, $env, $res) = @_;
+ my $alive = response_header_write($self, $env, $res);
+ if (defined(my $body = $res->[2])) {
+ if (ref $body eq 'ARRAY') {
+ if ($alive == 2) {
+ chunked_write($self, $_) for @$body;
+ } else {
+ identity_write($self, $_) for @$body;
+ }
+ response_done($self, $alive);
+ } else {
+ $self->{forward} = $body;
+ $self->{alive} = $alive;
+ getline_pull($self); # kick-off!
+ }
+ # these are returned to the calling application:
+ } elsif ($alive >= 2) {
+ bless [ $self, $alive ], 'PublicInbox::HTTP::Chunked';
+ } else {
+ bless [ $self, $alive ], 'PublicInbox::HTTP::Identity';