]> Sergey Matveev's repositories - public-inbox.git/commitdiff
qspawn: replace anonymous $end callbacks w/ event_step
authorEric Wong <e@80x24.org>
Wed, 25 Dec 2019 07:50:39 +0000 (07:50 +0000)
committerEric Wong <e@80x24.org>
Thu, 26 Dec 2019 10:48:19 +0000 (10:48 +0000)
This will tie into the DS event loop if that's used, but
event_step an be called directly without relying on the
event loop from Apache or other HTTP servers (or PSGI tests).

lib/PublicInbox/GetlineBody.pm
lib/PublicInbox/HTTPD/Async.pm
lib/PublicInbox/Qspawn.pm

index f8cdd1b7f8ea8beb477bd81297a7b12b44722681..750a8c536183e0abeb2cd0eba60b54ab341e6de4 100644 (file)
@@ -13,10 +13,11 @@ use strict;
 use warnings;
 
 sub new {
-       my ($class, $rpipe, $end, $buf, $filter) = @_;
+       my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_;
        bless {
                rpipe => $rpipe,
                end => $end,
+               end_arg => $end_arg,
                buf => $buf,
                filter => $filter || 0,
        }, $class;
@@ -40,10 +41,9 @@ sub getline {
 
 sub close {
        my ($self) = @_;
-       my $rpipe = delete $self->{rpipe};
+       my ($rpipe, $end, $end_arg) = delete @$self{qw(rpipe end end_arg)};
        close $rpipe if $rpipe;
-       my $end = delete $self->{end};
-       $end->() if $end;
+       $end->($end_arg) if $end;
 }
 
 1;
index d182c11886c5f4b1557d9e0828469fcdb7ae7b28..8956f7198920a2e218ec5ec40a9e7a87cb3ab2df 100644 (file)
@@ -10,7 +10,7 @@ package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(cb arg end end_arg);
+use fields qw(cb arg end_obj);
 use Errno qw(EAGAIN);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 
@@ -18,13 +18,13 @@ use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 # $io is a read-only pipe ($rpipe) for now, but may be a
 # bidirectional socket in the future.
 sub new {
-       my ($class, $io, $cb, $arg, $end, $end_arg) = @_;
+       my ($class, $io, $cb, $arg, $end_obj) = @_;
 
        # no $io? call $cb at the top of the next event loop to
        # avoid recursion:
        unless (defined($io)) {
                PublicInbox::DS::requeue($cb ? $cb : $arg);
-               die '$end unsupported w/o $io' if $end;
+               die '$end_obj unsupported w/o $io' if $end_obj;
                return;
        }
 
@@ -33,8 +33,7 @@ sub new {
        $self->SUPER::new($io, EPOLLIN | EPOLLET);
        $self->{cb} = $cb; # initial read callback, later replaced by main_cb
        $self->{arg} = $arg; # arg for $cb
-       $self->{end} = $end; # like END {}, but only for this object
-       $self->{end_arg} = $end_arg; # arg for $end
+       $self->{end_obj} = $end_obj; # like END{}, can ->event_step
        $self;
 }
 
@@ -98,8 +97,11 @@ sub close {
        $self->SUPER::close; # DS::close
 
        # we defer this to the next timer loop since close is deferred
-       if (my $end = delete $self->{end}) {
-               PublicInbox::DS::requeue($end);
+       if (my $end_obj = delete $self->{end_obj}) {
+               # this calls $end_obj->event_step
+               # (likely PublicInbox::Qspawn::event_step,
+               #  NOT PublicInbox::HTTPD::Async::event_step)
+               PublicInbox::DS::requeue($end_obj);
        }
 }
 
index ba980e7397f4510c874cf5ea0da101ac61f816b5..6cb28b9ac061622b5223749474269a26e7cf4a90 100644 (file)
@@ -94,7 +94,8 @@ sub waitpid_err ($$) {
                $err = "W: waitpid($xpid, 0) => $pid: $!";
        } # else should not be called with pid == 0
 
-       my $env = delete $self->{psgi_env};
+       my ($env, $qx_cb, $qx_arg, $qx_buf) =
+               delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
 
        # done, spawn whatever's in the queue
        my $limiter = $self->{limiter};
@@ -112,15 +113,12 @@ sub waitpid_err ($$) {
                        log_err($env, join(' ', @{$self->{args}}) . ": $err");
                }
        }
-       if (my $fin_cb = delete $self->{fin_cb}) {
-               eval { $fin_cb->() }
-       }
+       eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
 }
 
-sub do_waitpid ($;$$) {
-       my ($self, $fin_cb) = @_;
+sub do_waitpid ($) {
+       my ($self) = @_;
        my $pid = $self->{pid};
-       $self->{fin_cb} = $fin_cb;
        # PublicInbox::DS may not be loaded
        eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) };
        # done if we're running in PublicInbox::DS::EventLoop
@@ -131,12 +129,14 @@ sub do_waitpid ($;$$) {
        }
 }
 
-sub finish ($;$) {
-       my ($self, $fin_cb) = @_;
+sub finish ($) {
+       my ($self) = @_;
        if (delete $self->{rpipe}) {
-               do_waitpid($self, $fin_cb);
-       } elsif ($fin_cb) {
-               eval { $fin_cb->() };
+               do_waitpid($self);
+       } else {
+               my ($env, $qx_cb, $qx_arg, $qx_buf) =
+                       delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+               eval { $qx_cb->($qx_buf, $qx_arg) } if $qx_cb;
        }
 }
 
@@ -154,16 +154,14 @@ sub start {
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
 # and safe to slurp.
 sub psgi_qx {
-       my ($self, $env, $limiter, $qx_cb, $cb_arg) = @_;
+       my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
        $self->{psgi_env} = $env;
-       my $scalar = '';
-       open(my $qx, '+>', \$scalar) or die; # PerlIO::scalar
-       my $end = sub {
-               my $err = $_[0]; # $!
-               log_err($env, "psgi_qx: $err") if defined($err);
-               finish($self, sub { $qx_cb->(\$scalar, $cb_arg) });
-               $qx = undef;
-       };
+       my $qx_buf = '';
+       open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
+       $self->{qx_cb} = $qx_cb;
+       $self->{qx_arg} = $qx_arg;
+       $self->{qx_fh} = $qx_fh;
+       $self->{qx_buf} = \$qx_buf;
        my $rpipe; # comes from popen_rd
        my $async = $env->{'pi-httpd.async'};
        my $cb = sub {
@@ -171,24 +169,24 @@ sub psgi_qx {
 reread:
                $r = sysread($rpipe, $buf, 65536);
                if ($async) {
-                       $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+                       $async->async_pass($env->{'psgix.io'}, $qx_fh, \$buf);
                } elsif (defined $r) {
-                       $r ? $qx->write($buf) : $end->();
+                       $r ? $qx_fh->write($buf) : event_step($self, undef);
                } else {
                        return if $! == EAGAIN; # try again when notified
                        goto reread if $! == EINTR;
-                       $end->($!);
+                       event_step($self, $!);
                }
        };
        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
        $self->start($limiter, sub { # start_cb, may run later, much later...
                ($rpipe) = @_; # popen_rd result
                if ($async) {
-               # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                       $async = $async->($rpipe, $cb, undef, $end);
+               # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end_obj)
+                       $async = $async->($rpipe, $cb, undef, $self);
                        # $cb will call ->async_pass or ->close
                } else { # generic PSGI
-                       $cb->() while $qx;
+                       $cb->() while $self->{qx_fh};
                }
        });
 }
@@ -206,6 +204,17 @@ sub filter_fh ($$) {
                });
 }
 
+# this is called on pipe EOF to reap the process, may be called
+# via PublicInbox::DS event loop OR via GetlineBody for generic
+# PSGI servers.
+sub event_step {
+       my ($self, $err) = @_; # $err: $!
+       log_err($self->{psgi_env}, "psgi_{return,qx} $err") if defined($err);
+       finish($self);
+       my ($fh, $qx_fh) = delete(@$self{qw(fh qx_fh)});
+       $fh->close if $fh; # async-only (psgi_return)
+}
+
 # Used for streaming the stdout of one process as a PSGI response.
 #
 # $env is the PSGI env.
@@ -231,14 +240,7 @@ sub filter_fh ($$) {
 sub psgi_return {
        my ($self, $env, $limiter, $parse_hdr) = @_;
        $self->{psgi_env} = $env;
-       my ($fh, $rpipe);
-       my $end = sub {
-               my $err = $_[0]; # $!
-               log_err($env, "psgi_return: $err") if defined($err);
-               finish($self);
-               $fh->close if $fh; # async-only
-       };
-
+       my $rpipe;
        my $buf = '';
        my $rd_hdr = sub {
                # typically used for reading CGI headers
@@ -271,21 +273,24 @@ sub psgi_return {
                my $filter = delete $env->{'qspawn.filter'};
                if (scalar(@$r) == 3) { # error
                        if ($async) {
-                               $async->close; # calls rpipe->close and $end
+                               # calls rpipe->close && ->event_step
+                               $async->close;
                        } else {
                                $rpipe->close;
-                               $end->();
+                               event_step($self);
                        }
                        $wcb->($r);
                } elsif ($async) {
                        # done reading headers, handoff to read body
-                       $fh = $wcb->($r); # scalar @$r == 2
+                       my $fh = $wcb->($r); # scalar @$r == 2
                        $fh = filter_fh($fh, $filter) if $filter;
+                       $self->{fh} = $fh;
                        $async->async_pass($env->{'psgix.io'}, $fh, \$buf);
                } else { # for synchronous PSGI servers
                        require PublicInbox::GetlineBody;
-                       $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end,
-                                                               $buf, $filter);
+                       $r->[2] = PublicInbox::GetlineBody->new($rpipe,
+                                               \&event_step, $self,
+                                               $buf, $filter);
                        $wcb->($r);
                }
 
@@ -297,8 +302,9 @@ sub psgi_return {
        my $start_cb = sub { # may run later, much later...
                ($rpipe) = @_;
                if ($async) {
-                       # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                       $async = $async->($rpipe, $cb, undef, $end);
+                       # PublicInbox::HTTPD::Async->new($rpipe, $cb, $cb_arg,
+                       #                                $end_obj)
+                       $async = $async->($rpipe, $cb, undef, $self);
                        # $cb will call ->async_pass or ->close
                } else { # generic PSGI
                        $cb->() while $rd_hdr;