+# middlewares such as Deflater may write empty strings
+sub chunked_wcb ($) {
+ my ($self) = @_;
+ sub {
+ return if $_[0] eq '';
+ msg_more($self, sprintf("%x\r\n", bytes::length($_[0])));
+ msg_more($self, $_[0]);
+
+ # use $self->write(\"\n\n") if you care about real-time
+ # streaming responses, public-inbox WWW does not.
+ msg_more($self, "\r\n");
+ }
+}
+
+sub identity_wcb ($) {
+ my ($self) = @_;
+ sub { $self->write(\($_[0])) if $_[0] ne '' }
+}
+
+sub next_request ($) {
+ my ($self) = @_;
+ if ($self->{rbuf}) {
+ # avoid recursion for pipelined requests
+ PublicInbox::DS::requeue(\&process_pipelineq) if !@$pipelineq;
+ push @$pipelineq, $self;
+ } else { # wait for next request
+ $self->requeue;
+ }
+}
+
+sub response_done_cb ($$) {
+ my ($self, $alive) = @_;
+ sub {
+ my $env = delete $self->{env};
+ $self->write(\"0\r\n\r\n") if $alive == 2;
+ $self->write($alive ? \&next_request : \&close);
+ }
+}
+
+sub getline_response ($$$) {
+ my ($self, $write, $close) = @_;
+ my $pull; # DANGER: self-referential
+ $pull = sub {
+ my $forward = $self->{forward};
+ # limit our own running time for fairness with other
+ # clients and to avoid buffering too much:
+ my $buf = eval {
+ local $/ = \8192;
+ $forward->getline;
+ } if $forward;
+
+ if (defined $buf) {
+ $write->($buf); # may close in PublicInbox::DS::write
+
+ if ($self->{sock}) {
+ my $wbuf = $self->{wbuf} ||= [];
+ push @$wbuf, $pull;
+
+ # wbuf may be populated by $write->($buf),
+ # no need to rearm if so:
+ $self->requeue if scalar(@$wbuf) == 1;
+ return; # likely
+ }
+ } elsif ($@) {
+ err($self, "response ->getline error: $@");
+ $self->close;
+ }
+
+ $pull = undef; # all done!
+ # avoid recursion
+ if (delete $self->{forward}) {
+ eval { $forward->close };
+ if ($@) {
+ err($self, "response ->close error: $@");
+ $self->close; # idempotent
+ }
+ }
+ $forward = undef;
+ $close->(); # call response_done_cb
+ };
+
+ $pull->(); # kick-off!
+}
+