}
}
+sub _psgi_finish ($$) {
+ my ($self, $env) = @_;
+ my $err = $self->finish;
+ if ($err && !$env->{'qspawn.quiet'}) {
+ $err = join(' ', @{$self->{args}->[0]}).": $err\n";
+ $env->{'psgi.errors'}->print($err);
+ }
+}
+
+sub psgi_qx {
+ my ($self, $env, $limiter, $qx_cb) = @_;
+ my $qx = PublicInbox::Qspawn::Qx->new;
+ my $end = sub {
+ _psgi_finish($self, $env);
+ eval { $qx_cb->($qx) };
+ $qx = undef;
+ };
+ my $rpipe;
+ my $async = $env->{'pi-httpd.async'};
+ my $cb = sub {
+ my $r = sysread($rpipe, my $buf, 8192);
+ if ($async) {
+ $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+ } elsif (defined $r) {
+ $r ? $qx->write($buf) : $end->();
+ } else {
+ return if $!{EAGAIN} || $!{EINTR}; # loop again
+ $end->();
+ }
+ };
+ $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ $self->start($limiter, sub { # may run later, much later...
+ ($rpipe) = @_;
+ if ($async) {
+ # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+ $async = $async->($rpipe, $cb, $end);
+ } else { # generic PSGI
+ $cb->() while $qx;
+ }
+ });
+}
+
# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
sub filter_fh ($$) {
my ($fh, $filter) = @_;
my ($self, $env, $limiter, $parse_hdr) = @_;
my ($fh, $rpipe);
my $end = sub {
- my $err = $self->finish;
- if ($err && !$env->{'qspawn.quiet'}) {
- $err = join(' ', @{$self->{args}->[0]}).": $err\n";
- $env->{'psgi.errors'}->print($err);
- }
+ _psgi_finish($self, $env);
$fh->close if $fh; # async-only
};
return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
$parse_hdr->($r, \$buf);
};
- my $res;
+ my $res = delete $env->{'qspawn.response'};
my $async = $env->{'pi-httpd.async'};
my $cb = sub {
my $r = $rd_hdr->() or return;
}
};
$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ my $start_cb = sub { # may run later, much later...
+ ($rpipe) = @_;
+ if ($async) {
+ # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+ $async = $async->($rpipe, $cb, $end);
+ } else { # generic PSGI
+ $cb->() while $rd_hdr;
+ }
+ };
+
+ return $self->start($limiter, $start_cb) if $res;
+
sub {
($res) = @_;
- $self->start($limiter, sub { # may run later, much later...
- ($rpipe) = @_;
- if ($async) {
- # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
- $async = $async->($rpipe, $cb, $end);
- } else { # generic PSGI
- $cb->() while $rd_hdr;
- }
- });
+ $self->start($limiter, $start_cb);
};
}
}, $class;
}
+# captures everything into a buffer and executes a callback when done
+package PublicInbox::Qspawn::Qx;
+use strict;
+use warnings;
+
+sub new {
+ my ($class) = @_;
+ my $buf = '';
+ bless \$buf, $class;
+}
+
+# called by PublicInbox::HTTPD::Async ($fh->write)
+sub write {
+ ${$_[0]} .= $_[1];
+ undef;
+}
+
1;