]> Sergey Matveev's repositories - public-inbox.git/commitdiff
qspawn: implement psgi_qx
authorEric Wong <e@80x24.org>
Wed, 18 Jan 2017 07:27:03 +0000 (07:27 +0000)
committerEric Wong <e@80x24.org>
Tue, 22 Jan 2019 03:40:18 +0000 (03:40 +0000)
This new asynchronous API, will allow us to take
advantage of non-blocking I/O from even small commands;
as those may still need to wait for slow operations.

lib/PublicInbox/Qspawn.pm

index 96fbf38d1cbda2e4c6c939757969c0b90d7b7032..6859a8af31017d5d53d30a69475a48ae3fe06fcd 100644 (file)
@@ -61,6 +61,48 @@ sub start {
        }
 }
 
+sub _psgi_finish ($$) {
+       my ($self, $env) = @_;
+       my $err = $self->finish;
+       if ($err && !$env->{'qspawn.quiet'}) {
+               $err = join(' ', @{$self->{args}->[0]}).": $err\n";
+               $env->{'psgi.errors'}->print($err);
+       }
+}
+
+sub psgi_qx {
+       my ($self, $env, $limiter, $qx_cb) = @_;
+       my $qx = PublicInbox::Qspawn::Qx->new;
+       my $end = sub {
+               _psgi_finish($self, $env);
+               eval { $qx_cb->($qx) };
+               $qx = undef;
+       };
+       my $rpipe;
+       my $async = $env->{'pi-httpd.async'};
+       my $cb = sub {
+               my $r = sysread($rpipe, my $buf, 8192);
+               if ($async) {
+                       $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
+               } elsif (defined $r) {
+                       $r ? $qx->write($buf) : $end->();
+               } else {
+                       return if $!{EAGAIN} || $!{EINTR}; # loop again
+                       $end->();
+               }
+       };
+       $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+       $self->start($limiter, sub { # may run later, much later...
+               ($rpipe) = @_;
+               if ($async) {
+               # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+                       $async = $async->($rpipe, $cb, $end);
+               } else { # generic PSGI
+                       $cb->() while $qx;
+               }
+       });
+}
+
 # create a filter for "push"-based streaming PSGI writes used by HTTPD::Async
 sub filter_fh ($$) {
        my ($fh, $filter) = @_;
@@ -78,11 +120,7 @@ sub psgi_return {
        my ($self, $env, $limiter, $parse_hdr) = @_;
        my ($fh, $rpipe);
        my $end = sub {
-               my $err = $self->finish;
-               if ($err && !$env->{'qspawn.quiet'}) {
-                       $err = join(' ', @{$self->{args}->[0]}).": $err\n";
-                       $env->{'psgi.errors'}->print($err);
-               }
+               _psgi_finish($self, $env);
                $fh->close if $fh; # async-only
        };
 
@@ -92,7 +130,7 @@ sub psgi_return {
                return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
                $parse_hdr->($r, \$buf);
        };
-       my $res;
+       my $res = delete $env->{'qspawn.response'};
        my $async = $env->{'pi-httpd.async'};
        my $cb = sub {
                my $r = $rd_hdr->() or return;
@@ -118,17 +156,21 @@ sub psgi_return {
                }
        };
        $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+       my $start_cb = sub { # may run later, much later...
+               ($rpipe) = @_;
+               if ($async) {
+                       # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
+                       $async = $async->($rpipe, $cb, $end);
+               } else { # generic PSGI
+                       $cb->() while $rd_hdr;
+               }
+       };
+
+       return $self->start($limiter, $start_cb) if $res;
+
        sub {
                ($res) = @_;
-               $self->start($limiter, sub { # may run later, much later...
-                       ($rpipe) = @_;
-                       if ($async) {
-                       # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
-                               $async = $async->($rpipe, $cb, $end);
-                       } else { # generic PSGI
-                               $cb->() while $rd_hdr;
-                       }
-               });
+               $self->start($limiter, $start_cb);
        };
 }
 
@@ -146,4 +188,21 @@ sub new {
        }, $class;
 }
 
+# captures everything into a buffer and executes a callback when done
+package PublicInbox::Qspawn::Qx;
+use strict;
+use warnings;
+
+sub new {
+       my ($class) = @_;
+       my $buf = '';
+       bless \$buf, $class;
+}
+
+# called by PublicInbox::HTTPD::Async ($fh->write)
+sub write {
+       ${$_[0]} .= $_[1];
+       undef;
+}
+
 1;