- my ($self, $env, $limiter, $qx_cb) = @_;
- my $qx = PublicInbox::Qspawn::Qx->new;
- my $end = sub {
- _psgi_finish($self, $env);
- eval { $qx_cb->($qx) };
- $qx = undef;
- };
- my $rpipe;
- my $async = $env->{'pi-httpd.async'};
- my $cb = sub {
- my $r = sysread($rpipe, my $buf, 8192);
- if ($async) {
- $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
- } elsif (defined $r) {
- $r ? $qx->write($buf) : $end->();
+ my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+ $self->{psgi_env} = $env;
+ my $qx_buf = '';
+ open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
+ $self->{qx_cb} = $qx_cb;
+ $self->{qx_arg} = $qx_arg;
+ $self->{qx_fh} = $qx_fh;
+ $self->{qx_buf} = \$qx_buf;
+ $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ start($self, $limiter, \&psgi_qx_start);
+}
+
+# this is called on pipe EOF to reap the process, may be called
+# via PublicInbox::DS event loop OR via GetlineBody for generic
+# PSGI servers.
+sub event_step {
+ my ($self, $err) = @_; # $err: $!
+ warn "psgi_{return,qx} $err" if defined($err);
+ finish($self);
+ my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)});
+ $fh->close if $fh; # async-only (psgi_return)
+}
+
+sub rd_hdr ($) {
+ my ($self) = @_;
+ # typically used for reading CGI headers
+ # We also need to check EINTR for generic PSGI servers.
+ my $ret;
+ my $total_rd = 0;
+ my $hdr_buf = $self->{hdr_buf};
+ my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}};
+ do {
+ my $r = sysread($self->{rpipe}, $$hdr_buf, 4096,
+ length($$hdr_buf));
+ if (defined($r)) {
+ $total_rd += $r;
+ eval { $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg) };
+ if ($@) {
+ warn "parse_hdr: $@";
+ $ret = [ 500, [], [ "Internal error\n" ] ];
+ }