use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(cb arg end end_arg);
+use fields qw(cb arg end_obj);
use Errno qw(EAGAIN);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
# $io is a read-only pipe ($rpipe) for now, but may be a
# bidirectional socket in the future.
sub new {
- my ($class, $io, $cb, $arg, $end, $end_arg) = @_;
+ my ($class, $io, $cb, $arg, $end_obj) = @_;
# no $io? call $cb at the top of the next event loop to
# avoid recursion:
unless (defined($io)) {
PublicInbox::DS::requeue($cb ? $cb : $arg);
- die '$end unsupported w/o $io' if $end;
+ die '$end_obj unsupported w/o $io' if $end_obj;
return;
}
$self->SUPER::new($io, EPOLLIN | EPOLLET);
$self->{cb} = $cb; # initial read callback, later replaced by main_cb
$self->{arg} = $arg; # arg for $cb
- $self->{end} = $end; # like END {}, but only for this object
- $self->{end_arg} = $end_arg; # arg for $end
+ $self->{end_obj} = $end_obj; # like END{}, can ->event_step
$self;
}
$self->SUPER::close; # DS::close
# we defer this to the next timer loop since close is deferred
- if (my $end = delete $self->{end}) {
- PublicInbox::DS::requeue($end);
+ if (my $end_obj = delete $self->{end_obj}) {
+ # this calls $end_obj->event_step
+ # (likely PublicInbox::Qspawn::event_step,
+ # NOT PublicInbox::HTTPD::Async::event_step)
+ PublicInbox::DS::requeue($end_obj);
}
}
$err = "W: waitpid($xpid, 0) => $pid: $!";
} # else should not be called with pid == 0
- my $env = delete $self->{psgi_env};
+ my ($env, $qx_cb, $qx_arg, $qx_buf) =
+ delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
# done, spawn whatever's in the queue
my $limiter = $self->{limiter};
log_err($env, join(' ', @{$self->{args}}) . ": $err");
}
}
- if (my $fin_cb = delete $self->{fin_cb}) {
- eval { $fin_cb->() }
- }
+ eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
}
-sub do_waitpid ($;$$) {
- my ($self, $fin_cb) = @_;
+sub do_waitpid ($) {
+ my ($self) = @_;
my $pid = $self->{pid};
- $self->{fin_cb} = $fin_cb;
# PublicInbox::DS may not be loaded
eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) };
# done if we're running in PublicInbox::DS::EventLoop
}
}
-sub finish ($;$) {
- my ($self, $fin_cb) = @_;
+sub finish ($) {
+ my ($self) = @_;
if (delete $self->{rpipe}) {
- do_waitpid($self, $fin_cb);
- } elsif ($fin_cb) {
- eval { $fin_cb->() };
+ do_waitpid($self);
+ } else {
+ my ($env, $qx_cb, $qx_arg, $qx_buf) =
+ delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+ eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
}
}
# $env is the PSGI env. As with ``/qx; only use this when output is small
# and safe to slurp.
sub psgi_qx {
- my ($self, $env, $limiter, $qx_cb, $cb_arg) = @_;
+ my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
$self->{psgi_env} = $env;
- my $scalar = '';
- open(my $qx, '+>', \$scalar) or die; # PerlIO::scalar
- my $end = sub {
- my $err = $_[0]; # $!
- log_err($env, "psgi_qx: $err") if defined($err);
- finish($self, sub { $qx_cb->(\$scalar, $cb_arg) });
- $qx = undef;
- };
+ my $qx_buf = '';
+ open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
+ $self->{qx_cb} = $qx_cb;
+ $self->{qx_arg} = $qx_arg;
+ $self->{qx_fh} = $qx_fh;
+ $self->{qx_buf} = \$qx_buf;
my $rpipe; # comes from popen_rd
my $async = $env->{'pi-httpd.async'};
my $cb = sub {
reread:
$r = sysread($rpipe, $buf, 65536);
if ($async) {
- $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+ $async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf);
} elsif (defined $r) {
- $r ? $qx->write($buf) : $end->();
+ $r ? $qx_fh->write($buf) : event_step($self, undef);
} else {
return if $! == EAGAIN; # try again when notified
goto reread if $! == EINTR;
- $end->($!);
+ event_step($self, $!);
}
};
$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
$self->start($limiter, sub { # start_cb, may run later, much later...
($rpipe) = @_; # popen_rd result
if ($async) {
- # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
- $async = $async->($rpipe, $cb, undef, $end);
+ # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end_obj)
+ $async = $async->($rpipe, $cb, undef, $self);
# $cb will call ->async_pass or ->close
} else { # generic PSGI
- $cb->() while $qx;
+ $cb->() while $self->{qx_fh};
}
});
}
});
}
+# this is called on pipe EOF to reap the process, may be called
+# via PublicInbox::DS event loop OR via GetlineBody for generic
+# PSGI servers.
+sub event_step {
+ my ($self, $err) = @_; # $err: $!
+ log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err);
+ finish($self);
+ my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)});
+ $fh->close if $fh; # async-only (psgi_return)
+}
+
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
sub psgi_return {
my ($self, $env, $limiter, $parse_hdr) = @_;
$self->{psgi_env} = $env;
- my ($fh, $rpipe);
- my $end = sub {
- my $err = $_[0]; # $!
- log_err($env, "psgi_return: $err") if defined($err);
- finish($self);
- $fh->close if $fh; # async-only
- };
-
+ my $rpipe;
my $buf = '';
my $rd_hdr = sub {
# typically used for reading CGI headers
my $filter = delete $env->{'qspawn.filter'};
if (scalar(@$r) == 3) { # error
if ($async) {
- $async->close; # calls rpipe->close and $end
+ # calls rpipe->close && ->event_step
+ $async->close;
} else {
$rpipe->close;
- $end->();
+ event_step($self);
}
$wcb->($r);
} elsif ($async) {
# done reading headers, handoff to read body
- $fh = $wcb->($r); # scalar @$r == 2
+ my $fh = $wcb->($r); # scalar @$r == 2
$fh = filter_fh($fh, $filter) if $filter;
+ $self->{fh} = $fh;
$async->async_pass($env->{'psgix.io'}, $fh, \$buf);
} else { # for synchronous PSGI servers
require PublicInbox::GetlineBody;
- $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
- $buf, $filter);
+ $r->[2] = PublicInbox::GetlineBody->new($rpipe,
+ \&event_step, $self,
+ $buf, $filter);
$wcb->($r);
}
my $start_cb = sub { # may run later, much later...
($rpipe) = @_;
if ($async) {
- # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
- $async = $async->($rpipe, $cb, undef, $end);
+ # PublicInbox::HTTPD::Async->new($rpipe, $cb, $cb_arg,
+ # $end_obj)
+ $async = $async->($rpipe, $cb, undef, $self);
# $cb will call ->async_pass or ->close
} else { # generic PSGI
$cb->() while $rd_hdr;