]> Sergey Matveev's repositories - public-inbox.git/commitdiff
http: support async_pass for Danga::Socket
authorEric Wong <e@80x24.org>
Sat, 21 May 2016 23:45:27 +0000 (23:45 +0000)
committerEric Wong <e@80x24.org>
Sat, 21 May 2016 23:46:06 +0000 (23:46 +0000)
This will allow us to minimize buffering after we wait
(possibly a long time) for readability.  This also greatly
reduces the amount of Danga::Socket-specific knowledge we
have in our PSGI code, making it easier for others to
understand.

lib/PublicInbox/HTTP.pm
lib/PublicInbox/HTTPD/Async.pm

index f69056f87932980cb8dffe2d41ae9d1c3be4297b..d523bd42a3bd6571ec33d6074779a8dea7fe56ab 100644 (file)
@@ -219,6 +219,24 @@ sub response_write {
                if (ref $body eq 'ARRAY') {
                        $write->($_) foreach @$body;
                        $close->();
+               } elsif ($body->can('async_pass')) { # HTTPD::Async
+                       # prevent us from reading the body faster than we
+                       # can write to the client
+                       my $restart_read = sub { $body->watch_read(1) };
+                       $body->async_pass(sub {
+                               local $/ = \8192;
+                               my $buf = $body->getline;
+                               if (defined $buf) {
+                                       $write->($buf);
+                                       if ($self->{write_buf}) {
+                                               $body->watch_read(0);
+                                               $self->write($restart_read);
+                                       }
+                                       return; # continue waiting
+                               }
+                               $body->close;
+                               $close->();
+                       });
                } else {
                        my $pull;
                        $pull = sub {
index bedb397d0f9cdbf7b143615613fb117510549ea4..ceba738e3ce3f9793c314b16c7a3fe3733b55358 100644 (file)
@@ -21,10 +21,12 @@ sub new {
        $self;
 }
 
+sub async_pass { $_[0]->{cb} = $_[1] }
 sub event_read { $_[0]->{cb}->() }
 sub event_hup { $_[0]->{cb}->() }
 sub event_err { $_[0]->{cb}->() }
 sub sysread { shift->{sock}->sysread(@_) }
+sub getline { $_[0]->{sock}->getline };
 
 sub close {
        my $self = shift;