# Generic PSGI server for convenience. It aims to provide
# a consistent experience for public-inbox admins so they don't have
# to learn different ways to admin both NNTP and HTTP components.
-# There's nothing public-inbox-specific, here.
+# There's nothing which depends on public-inbox, here.
# Each instance of this class represents a HTTP client socket
package PublicInbox::HTTP;
use strict;
use warnings;
use base qw(Danga::Socket);
-use fields qw(httpd env rbuf input_left remote_addr remote_port forward);
+use fields qw(httpd env rbuf input_left remote_addr remote_port forward pull);
use Fcntl qw(:seek);
use Plack::HTTPParser qw(parse_http_request); # XS or pure Perl
use HTTP::Status qw(status_message);
use HTTP::Date qw(time2str);
+use Scalar::Util qw(weaken);
use IO::File;
use constant {
CHUNK_START => -1, # [a-f0-9]+\r\n
CHUNK_MAX_HDR => 256,
};
-# FIXME: duplicated code with NNTP.pm
+# FIXME: duplicated code with NNTP.pm, layering violation
my $WEAKEN = {}; # string(inbox) -> inbox
my $weakt;
sub weaken_task () {
return if $_[0] eq '';
more($self, sprintf("%x\r\n", bytes::length($_[0])));
more($self, $_[0]);
- $self->write("\r\n");
+
+ # use $self->write("\n\n") if you care about real-time
+ # streaming responses, public-inbox WWW does not.
+ more($self, "\r\n");
}
}
$self->{env} = undef;
$self->write("0\r\n\r\n") if $alive == 2;
$self->write(sub { $alive ? next_request($self) : $self->close });
+
+ # FIXME: layering violation
if (my $obj = $env->{'pi-httpd.inbox'}) {
# grace period for reaping resources
$WEAKEN->{"$obj"} = $obj;
}
}
+sub getline_response {
+ my ($self, $body, $write, $close) = @_;
+ $self->{forward} = $body;
+ weaken($self);
+ my $pull = $self->{pull} = sub {
+ local $/ = \8192;
+ my $forward = $self->{forward};
+ # limit our own running time for fairness with other
+ # clients and to avoid buffering too much:
+ while ($forward && defined(my $buf = $forward->getline)) {
+ $write->($buf);
+ last if $self->{closed};
+ if ($self->{write_buf_size}) {
+ $self->write($self->{pull});
+ } else {
+ PublicInbox::EvCleanup::asap($self->{pull});
+ }
+ return;
+ }
+ $self->{forward} = $self->{pull} = undef;
+ $forward->close if $forward; # avoid recursion
+ $close->();
+ };
+ $pull->();
+}
+
sub response_write {
my ($self, $env, $res) = @_;
my $alive = response_header_write($self, $env, $res);
$write->($_) foreach @$body;
$close->();
} else {
- my $pull;
- $pull = sub {
- local $/ = \8192;
- while (defined(my $buf = $body->getline)) {
- $write->($buf);
- if ($self->{write_buf_size}) {
- $self->write($pull);
- return;
- }
- }
- $pull = undef;
- $body->close();
- $close->();
- };
- $pull->();
+ getline_response($self, $body, $write, $close);
}
} else {
# this is returned to the calling application:
sub close {
my $self = shift;
my $forward = $self->{forward};
+ my $env = $self->{env};
+ delete $env->{'psgix.io'} if $env; # prevent circular referernces
+ $self->{pull} = $self->{forward} = $self->{env} = undef;
$forward->close if $forward;
- $self->{forward} = $self->{env} = undef;
$self->SUPER::close(@_);
}