$self->{qx_arg} = $qx_arg;
$self->{qx_fh} = $qx_fh;
$self->{qx_buf} = \$qx_buf;
- my $rpipe; # comes from popen_rd
my $async = $env->{'pi-httpd.async'};
my $cb = sub {
my ($r, $buf);
reread:
- $r = sysread($rpipe, $buf, 65536);
+ $r = sysread($self->{rpipe}, $buf, 65536);
if ($async) {
$async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf);
} elsif (defined $r) {
};
$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
$self->start($limiter, sub { # start_cb, may run later, much later...
- ($rpipe) = @_; # popen_rd result
if ($async) {
- # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end_obj)
- $async = $async->($rpipe, $cb, undef, $self);
+ # PublicInbox::HTTPD::Async->new(rpipe, $cb, $end_obj)
+ $async = $async->($self->{rpipe}, $cb, undef, $self);
# $cb will call ->async_pass or ->close
} else { # generic PSGI
$cb->() while $self->{qx_fh};
$fh->close if $fh; # async-only (psgi_return)
}
+sub rd_hdr ($) {
+ my ($self) = @_;
+ # typically used for reading CGI headers
+ # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm
+ # We also need to check EINTR for generic PSGI servers.
+ my $ret;
+ my $total_rd = 0;
+ my $hdr_buf = $self->{hdr_buf};
+ do {
+ my $r = sysread($self->{rpipe}, $$hdr_buf, 4096,
+ length($$hdr_buf));
+ if (defined($r)) {
+ $total_rd += $r;
+ $ret = $self->{parse_hdr}->($total_rd, $hdr_buf);
+ } else {
+ # caller should notify us when it's ready:
+ return if $! == EAGAIN;
+ next if $! == EINTR; # immediate retry
+ log_err($self->{psgi_env}, "error reading header: $!");
+ $ret = [ 500, [], [ "Internal error\n" ] ];
+ }
+ } until (defined $ret);
+ delete $self->{parse_hdr}; # done parsing headers
+ $ret;
+}
+
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
sub psgi_return {
my ($self, $env, $limiter, $parse_hdr) = @_;
$self->{psgi_env} = $env;
- my $rpipe;
- my $buf = '';
- my $rd_hdr = sub {
- # typically used for reading CGI headers
- # we must loop until EAGAIN for EPOLLET in HTTPD/Async.pm
- # We also need to check EINTR for generic PSGI servers.
- my $ret;
- my $total_rd = 0;
- do {
- my $r = sysread($rpipe, $buf, 4096, length($buf));
- if (defined($r)) {
- $total_rd += $r;
- $ret = $parse_hdr->($r ? $total_rd : 0, \$buf);
- } else {
- # caller should notify us when it's ready:
- return if $! == EAGAIN;
- next if $! == EINTR; # immediate retry
- log_err($env, "error reading header: $!");
- $ret = [ 500, [], [ "Internal error\n" ] ];
- }
- } until (defined $ret);
- $ret;
- };
-
+ $self->{hdr_buf} = \(my $hdr_buf = '');
+ $self->{parse_hdr} = $parse_hdr;
my $wcb = delete $env->{'qspawn.wcb'}; # or PSGI server supplies it
my $async = $env->{'pi-httpd.async'};
my $cb = sub {
- my $r = $rd_hdr->() or return;
- $rd_hdr = undef; # done reading headers
+ my $r = rd_hdr($self) or return;
my $filter = delete $env->{'qspawn.filter'};
if (scalar(@$r) == 3) { # error
if ($async) {
# calls rpipe->close && ->event_step
$async->close;
} else {
- $rpipe->close;
+ $self->{rpipe}->close;
event_step($self);
}
$wcb->($r);
my $fh = $wcb->($r); # scalar @$r == 2
$fh = filter_fh($fh, $filter) if $filter;
$self->{fh} = $fh;
- $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
+ $async->async_pass($env->{'psgix.io'}, $fh,
+ delete($self->{hdr_buf}));
} else { # for synchronous PSGI servers
require PublicInbox::GetlineBody;
- $r->[2] = PublicInbox::GetlineBody->new($rpipe,
+ $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
\&event_step, $self,
- $buf, $filter);
+ ${$self->{hdr_buf}}, $filter);
$wcb->($r);
}
};
$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
my $start_cb = sub { # may run later, much later...
- ($rpipe) = @_;
if ($async) {
- # PublicInbox::HTTPD::Async->new($rpipe, $cb, $cb_arg,
+ # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg,
# $end_obj)
- $async = $async->($rpipe, $cb, undef, $self);
+ $async = $async->($self->{rpipe}, $cb, undef, $self);
# $cb will call ->async_pass or ->close
} else { # generic PSGI
- $cb->() while $rd_hdr;
+ $cb->() while $self->{parse_hdr};
}
};