]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/HTTPD/Async.pm
ds: handle deferred DS->close after timers
[public-inbox.git] / lib / PublicInbox / HTTPD / Async.pm
index dbe8a84abbbd3291116e816a919073d7422c01cb..35d171506cd52b3253456aca2728d4a68685b30b 100644 (file)
@@ -11,6 +11,7 @@ use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
 require PublicInbox::EvCleanup;
+use Errno qw(EAGAIN);
 
 sub new {
        my ($class, $io, $cb, $cleanup) = @_;
@@ -18,25 +19,20 @@ sub new {
        # no $io? call $cb at the top of the next event loop to
        # avoid recursion:
        unless (defined($io)) {
-               PublicInbox::EvCleanup::asap($cb) if $cb;
-               PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+               PublicInbox::DS::requeue($cb);
+               die 'cleanup unsupported w/o $io' if $cleanup;
                return;
        }
 
        my $self = fields::new($class);
        IO::Handle::blocking($io, 0);
-       $self->SUPER::new($io);
+       $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
        $self->{cb} = $cb;
        $self->{cleanup} = $cleanup;
-       $self->watch_read(1);
        $self;
 }
 
-# fires after pending writes are complete:
-sub restart_read_cb ($) {
-       my ($self) = @_;
-       sub { $self->watch_read(1) }
-}
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
 
 sub main_cb ($$$) {
        my ($http, $fh, $bref) = @_;
@@ -44,24 +40,31 @@ sub main_cb ($$$) {
                my ($self) = @_;
                my $r = sysread($self->{sock}, $$bref, 8192);
                if ($r) {
-                       $fh->write($$bref);
-                       unless ($http->{closed}) { # PublicInbox::DS sets this
-                               if ($http->{write_buf_size}) {
-                                       $self->watch_read(0);
-                                       $http->write(restart_read_cb($self));
+                       $fh->write($$bref); # may call $http->close
+
+                       if ($http->{sock}) { # !closed
+                               if ($http->{wbuf}) {
+                                       # HTTP client could not keep up, so
+                                       # stop reading and buffering.
+                                       $self->watch(0);
+
+                                       # Tell the HTTP socket to restart us
+                                       # when HTTP client is done draining
+                                       # $http->{wbuf}:
+                                       $http->enqueue_restart_pass;
                                }
-                               # stay in watch_read, but let other clients
+                               # stay in EPOLLIN, but let other clients
                                # get some work done, too.
                                return;
                        }
                        # fall through to close below...
                } elsif (!defined $r) {
-                       return if $!{EAGAIN} || $!{EINTR};
+                       return restart_read($self) if $! == EAGAIN;
                }
 
                # Done! Error handling will happen in $fh->close
                # called by the {cleanup} handler
-               $http->{forward} = undef;
+               delete $http->{forward};
                $self->close;
        }
 }
@@ -75,18 +78,17 @@ sub async_pass {
        $self->{cb} = main_cb($http, $fh, $bref);
 }
 
-sub event_read { $_[0]->{cb}->(@_) }
-sub event_hup { $_[0]->{cb}->(@_) }
-sub event_err { $_[0]->{cb}->(@_) }
+sub event_step { $_[0]->{cb}->(@_) }
 
 sub close {
-       my $self = shift;
-       my $cleanup = $self->{cleanup};
-       $self->{cleanup} = $self->{cb} = undef;
-       $self->SUPER::close(@_);
+       my $self = $_[0];
+       delete $self->{cb};
+       $self->SUPER::close;
 
        # we defer this to the next timer loop since close is deferred
-       PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+       if (my $cleanup = delete $self->{cleanup}) {
+               PublicInbox::DS::requeue($cleanup);
+       }
 }
 
 1;