$ret;
}
+sub psgi_return_init_cb {
+ my ($self) = @_;
+ my $r = rd_hdr($self) or return;
+ my $env = $self->{psgi_env};
+ my $filter = delete $env->{'qspawn.filter'};
+ my $wcb = delete $env->{'qspawn.wcb'};
+ my $async = delete $self->{async};
+ if (scalar(@$r) == 3) { # error
+ if ($async) {
+ # calls rpipe->close && ->event_step
+ $async->close;
+ } else {
+ $self->{rpipe}->close;
+ event_step($self);
+ }
+ $wcb->($r);
+ } elsif ($async) {
+ # done reading headers, handoff to read body
+ my $fh = $wcb->($r); # scalar @$r == 2
+ $fh = filter_fh($fh, $filter) if $filter;
+ $self->{fh} = $fh;
+ $async->async_pass($env->{'psgix.io'}, $fh,
+ delete($self->{hdr_buf}));
+ } else { # for synchronous PSGI servers
+ require PublicInbox::GetlineBody;
+ $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
+ \&event_step, $self,
+ ${$self->{hdr_buf}}, $filter);
+ $wcb->($r);
+ }
+
+ # Workaround a leak under Perl 5.16.3 when combined with
+ # Plack::Middleware::Deflater:
+ $wcb = undef;
+}
+
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
$self->{psgi_env} = $env;
$self->{hdr_buf} = \(my $hdr_buf = '');
$self->{parse_hdr} = $parse_hdr;
- my $wcb = delete $env->{'qspawn.wcb'}; # or PSGI server supplies it
- my $async = $env->{'pi-httpd.async'};
-
- my $cb = sub {
- my $r = rd_hdr($self) or return;
- my $filter = delete $env->{'qspawn.filter'};
- if (scalar(@$r) == 3) { # error
- if ($async) {
- # calls rpipe->close && ->event_step
- $async->close;
- } else {
- $self->{rpipe}->close;
- event_step($self);
- }
- $wcb->($r);
- } elsif ($async) {
- # done reading headers, handoff to read body
- my $fh = $wcb->($r); # scalar @$r == 2
- $fh = filter_fh($fh, $filter) if $filter;
- $self->{fh} = $fh;
- $async->async_pass($env->{'psgix.io'}, $fh,
- delete($self->{hdr_buf}));
- } else { # for synchronous PSGI servers
- require PublicInbox::GetlineBody;
- $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
- \&event_step, $self,
- ${$self->{hdr_buf}}, $filter);
- $wcb->($r);
- }
-
- # Workaround a leak under Perl 5.16.3 when combined with
- # Plack::Middleware::Deflater:
- $wcb = undef;
- };
$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
my $start_cb = sub { # may run later, much later...
- if ($async) {
+ if (my $async = $env->{'pi-httpd.async'}) {
# PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg,
# $end_obj)
- $async = $async->($self->{rpipe}, $cb, undef, $self);
- # $cb will call ->async_pass or ->close
+ $self->{async} = $async->($self->{rpipe},
+ \&psgi_return_init_cb, $self,
+ $self);
} else { # generic PSGI
- $cb->() while $self->{parse_hdr};
+ psgi_return_init_cb($self) while $self->{parse_hdr};
}
};
# the caller already captured the PSGI write callback from
# the PSGI server, so we can call ->start, here:
- return $self->start($limiter, $start_cb) if $wcb;
+ return $self->start($limiter, $start_cb) if $env->{'qspawn.wcb'};
# the caller will return this sub to the PSGI server, so
# it can set the response callback (that is, for PublicInbox::HTTP,
# the chunked_wcb or identity_wcb callback), but other HTTP servers
# are supported:
sub {
- ($wcb) = @_;
+ $self->{psgi_env}->{'qspawn.wcb'} = $_[0];
$self->start($limiter, $start_cb);
};
}