]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/HTTPD/Async.pm
ds: handle deferred DS->close after timers
[public-inbox.git] / lib / PublicInbox / HTTPD / Async.pm
index 54b62451e884730294b7d9f22fef739261aeac9b..35d171506cd52b3253456aca2728d4a68685b30b 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # XXX This is a totally unstable API for public-inbox internal use only
@@ -8,25 +8,31 @@
 package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
-use base qw(Danga::Socket);
+use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
 require PublicInbox::EvCleanup;
+use Errno qw(EAGAIN);
 
 sub new {
        my ($class, $io, $cb, $cleanup) = @_;
+
+       # no $io? call $cb at the top of the next event loop to
+       # avoid recursion:
+       unless (defined($io)) {
+               PublicInbox::DS::requeue($cb);
+               die 'cleanup unsupported w/o $io' if $cleanup;
+               return;
+       }
+
        my $self = fields::new($class);
        IO::Handle::blocking($io, 0);
-       $self->SUPER::new($io);
+       $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
        $self->{cb} = $cb;
        $self->{cleanup} = $cleanup;
-       $self->watch_read(1);
        $self;
 }
 
-sub restart_read_cb ($) {
-       my ($self) = @_;
-       sub { $self->watch_read(1) }
-}
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
 
 sub main_cb ($$$) {
        my ($http, $fh, $bref) = @_;
@@ -34,22 +40,31 @@ sub main_cb ($$$) {
                my ($self) = @_;
                my $r = sysread($self->{sock}, $$bref, 8192);
                if ($r) {
-                       $fh->write($$bref);
-                       return if $http->{closed};
-                       if ($http->{write_buf_size}) {
-                               $self->watch_read(0);
-                               $http->write(restart_read_cb($self));
+                       $fh->write($$bref); # may call $http->close
+
+                       if ($http->{sock}) { # !closed
+                               if ($http->{wbuf}) {
+                                       # HTTP client could not keep up, so
+                                       # stop reading and buffering.
+                                       $self->watch(0);
+
+                                       # Tell the HTTP socket to restart us
+                                       # when HTTP client is done draining
+                                       # $http->{wbuf}:
+                                       $http->enqueue_restart_pass;
+                               }
+                               # stay in EPOLLIN, but let other clients
+                               # get some work done, too.
+                               return;
                        }
-                       # stay in watch_read, but let other clients
-                       # get some work done, too.
-                       return;
+                       # fall through to close below...
                } elsif (!defined $r) {
-                       return if $!{EAGAIN} || $!{EINTR};
+                       return restart_read($self) if $! == EAGAIN;
                }
 
                # Done! Error handling will happen in $fh->close
                # called by the {cleanup} handler
-               $http->{forward} = undef;
+               delete $http->{forward};
                $self->close;
        }
 }
@@ -63,19 +78,17 @@ sub async_pass {
        $self->{cb} = main_cb($http, $fh, $bref);
 }
 
-sub event_read { $_[0]->{cb}->(@_) }
-sub event_hup { $_[0]->{cb}->(@_) }
-sub event_err { $_[0]->{cb}->(@_) }
-sub sysread { shift->{sock}->sysread(@_) }
+sub event_step { $_[0]->{cb}->(@_) }
 
 sub close {
-       my $self = shift;
-       my $cleanup = $self->{cleanup};
-       $self->{cleanup} = $self->{cb} = undef;
-       $self->SUPER::close(@_);
+       my $self = $_[0];
+       delete $self->{cb};
+       $self->SUPER::close;
 
        # we defer this to the next timer loop since close is deferred
-       PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+       if (my $cleanup = delete $self->{cleanup}) {
+               PublicInbox::DS::requeue($cleanup);
+       }
 }
 
 1;