- # middlewares such as Deflater may write empty strings
- my $write = sub { $self->write($_[0]) if $_[0] ne '' };
- my $close = sub {
- if ($alive) {
- $self->event_write; # watch for readability if done
- } else {
- Danga::Socket::write($self, sub { $self->close });
+sub identity_wcb ($) {
+ my ($self) = @_;
+ sub { $self->write(\($_[0])) if $_[0] ne '' }
+}
+
+sub next_request ($) {
+ my ($self) = @_;
+ $self->watch_write(0);
+ if ($self->{rbuf} eq '') { # wait for next request
+ $self->watch_read(1);
+ } else { # avoid recursion for pipelined requests
+ push @$pipelineq, $self;
+ $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
+ }
+}
+
+sub response_done_cb ($$) {
+ my ($self, $alive) = @_;
+ sub {
+ my $env = $self->{env};
+ $self->{env} = undef;
+ $self->write("0\r\n\r\n") if $alive == 2;
+ $self->write(sub{$alive ? next_request($self) : $self->close});
+ }
+}
+
+sub getline_cb ($$$) {
+ my ($self, $write, $close) = @_;
+ local $/ = \8192;
+ my $forward = $self->{forward};
+ # limit our own running time for fairness with other
+ # clients and to avoid buffering too much:
+ if ($forward) {
+ my $buf = eval { $forward->getline };
+ if (defined $buf) {
+ $write->($buf); # may close in Danga::Socket::write
+ unless ($self->{closed}) {
+ my $next = $self->{pull};
+ if ($self->{write_buf_size}) {
+ $self->write($next);
+ } else {
+ PublicInbox::EvCleanup::asap($next);
+ }
+ return;
+ }
+ } elsif ($@) {
+ err($self, "response ->getline error: $@");
+ $forward = undef;
+ $self->close;