]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/HTTPD/Async.pm
ds: handle deferred DS->close after timers
[public-inbox.git] / lib / PublicInbox / HTTPD / Async.pm
index 3eb7f75a4eb5932441fcf7fb2551bc528cef53bf..35d171506cd52b3253456aca2728d4a68685b30b 100644 (file)
@@ -11,6 +11,7 @@ use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
 require PublicInbox::EvCleanup;
+use Errno qw(EAGAIN);
 
 sub new {
        my ($class, $io, $cb, $cleanup) = @_;
@@ -18,8 +19,8 @@ sub new {
        # no $io? call $cb at the top of the next event loop to
        # avoid recursion:
        unless (defined($io)) {
-               PublicInbox::EvCleanup::asap($cb) if $cb;
-               PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+               PublicInbox::DS::requeue($cb);
+               die 'cleanup unsupported w/o $io' if $cleanup;
                return;
        }
 
@@ -33,23 +34,24 @@ sub new {
 
 sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
 
-# fires after pending writes are complete:
-sub restart_read_cb ($) {
-       my ($self) = @_;
-       sub { restart_read($self) }
-}
-
 sub main_cb ($$$) {
        my ($http, $fh, $bref) = @_;
        sub {
                my ($self) = @_;
                my $r = sysread($self->{sock}, $$bref, 8192);
                if ($r) {
-                       $fh->write($$bref);
+                       $fh->write($$bref); # may call $http->close
+
                        if ($http->{sock}) { # !closed
                                if ($http->{wbuf}) {
+                                       # HTTP client could not keep up, so
+                                       # stop reading and buffering.
                                        $self->watch(0);
-                                       $http->write(restart_read_cb($self));
+
+                                       # Tell the HTTP socket to restart us
+                                       # when HTTP client is done draining
+                                       # $http->{wbuf}:
+                                       $http->enqueue_restart_pass;
                                }
                                # stay in EPOLLIN, but let other clients
                                # get some work done, too.
@@ -57,12 +59,12 @@ sub main_cb ($$$) {
                        }
                        # fall through to close below...
                } elsif (!defined $r) {
-                       return restart_read($self) if $!{EAGAIN};
+                       return restart_read($self) if $! == EAGAIN;
                }
 
                # Done! Error handling will happen in $fh->close
                # called by the {cleanup} handler
-               $http->{forward} = undef;
+               delete $http->{forward};
                $self->close;
        }
 }
@@ -79,13 +81,14 @@ sub async_pass {
 sub event_step { $_[0]->{cb}->(@_) }
 
 sub close {
-       my $self = shift;
-       my $cleanup = $self->{cleanup};
-       $self->{cleanup} = $self->{cb} = undef;
-       $self->SUPER::close(@_);
+       my $self = $_[0];
+       delete $self->{cb};
+       $self->SUPER::close;
 
        # we defer this to the next timer loop since close is deferred
-       PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+       if (my $cleanup = delete $self->{cleanup}) {
+               PublicInbox::DS::requeue($cleanup);
+       }
 }
 
 1;