]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/HTTPD/Async.pm
bundle Danga::Socket and Sys::Syscall
[public-inbox.git] / lib / PublicInbox / HTTPD / Async.pm
index 880cf021cef763a97ea9add08cea295d4c9789fa..dbe8a84abbbd3291116e816a919073d7422c01cb 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # XXX This is a totally unstable API for public-inbox internal use only
@@ -8,13 +8,21 @@
 package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
-use base qw(Danga::Socket);
+use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
-use Scalar::Util qw(weaken);
 require PublicInbox::EvCleanup;
 
 sub new {
        my ($class, $io, $cb, $cleanup) = @_;
+
+       # no $io? call $cb at the top of the next event loop to
+       # avoid recursion:
+       unless (defined($io)) {
+               PublicInbox::EvCleanup::asap($cb) if $cb;
+               PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+               return;
+       }
+
        my $self = fields::new($class);
        IO::Handle::blocking($io, 0);
        $self->SUPER::new($io);
@@ -24,46 +32,52 @@ sub new {
        $self;
 }
 
+# fires after pending writes are complete:
 sub restart_read_cb ($) {
        my ($self) = @_;
        sub { $self->watch_read(1) }
 }
 
-sub async_pass {
-       my ($self, $io, $fh, $bref) = @_;
-       # In case the client HTTP connection ($io) dies, it
-       # will automatically close this ($self) object.
-       $io->{forward} = $self;
-       $fh->write($$bref);
-       my $restart_read = restart_read_cb($self);
-       weaken($self);
-       $self->{cb} = sub {
+sub main_cb ($$$) {
+       my ($http, $fh, $bref) = @_;
+       sub {
+               my ($self) = @_;
                my $r = sysread($self->{sock}, $$bref, 8192);
                if ($r) {
                        $fh->write($$bref);
-                       return if $io->{closed};
-                       if ($io->{write_buf_size}) {
-                               $self->watch_read(0);
-                               $io->write($restart_read); # D::S::write
+                       unless ($http->{closed}) { # PublicInbox::DS sets this
+                               if ($http->{write_buf_size}) {
+                                       $self->watch_read(0);
+                                       $http->write(restart_read_cb($self));
+                               }
+                               # stay in watch_read, but let other clients
+                               # get some work done, too.
+                               return;
                        }
-                       # stay in watch_read, but let other clients
-                       # get some work done, too.
-                       return;
+                       # fall through to close below...
                } elsif (!defined $r) {
                        return if $!{EAGAIN} || $!{EINTR};
                }
 
                # Done! Error handling will happen in $fh->close
                # called by the {cleanup} handler
-               $io->{forward} = undef;
+               $http->{forward} = undef;
                $self->close;
        }
 }
 
-sub event_read { $_[0]->{cb}->() }
-sub event_hup { $_[0]->{cb}->() }
-sub event_err { $_[0]->{cb}->() }
-sub sysread { shift->{sock}->sysread(@_) }
+sub async_pass {
+       my ($self, $http, $fh, $bref) = @_;
+       # In case the client HTTP connection ($http) dies, it
+       # will automatically close this ($self) object.
+       $http->{forward} = $self;
+       $fh->write($$bref); # PublicInbox:HTTP::{chunked,identity}_wcb
+       $self->{cb} = main_cb($http, $fh, $bref);
+}
+
+sub event_read { $_[0]->{cb}->(@_) }
+sub event_hup { $_[0]->{cb}->(@_) }
+sub event_err { $_[0]->{cb}->(@_) }
 
 sub close {
        my $self = shift;
@@ -72,7 +86,7 @@ sub close {
        $self->SUPER::close(@_);
 
        # we defer this to the next timer loop since close is deferred
-       PublicInbox::EvCleanup::asap($cleanup) if $cleanup;
+       PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
 }
 
 1;