]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/HTTPD/Async.pm
update copyrights for 2018
[public-inbox.git] / lib / PublicInbox / HTTPD / Async.pm
index ceba738e3ce3f9793c314b16c7a3fe3733b55358..842aaf62e93fbfd22ceabc25269857c9282d1c99 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # XXX This is a totally unstable API for public-inbox internal use only
@@ -9,32 +9,73 @@ package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(cb);
+use fields qw(cb cleanup);
+require PublicInbox::EvCleanup;
 
 sub new {
-       my ($class, $io, $cb) = @_;
+       my ($class, $io, $cb, $cleanup) = @_;
        my $self = fields::new($class);
        IO::Handle::blocking($io, 0);
        $self->SUPER::new($io);
        $self->{cb} = $cb;
+       $self->{cleanup} = $cleanup;
        $self->watch_read(1);
        $self;
 }
 
-sub async_pass { $_[0]->{cb} = $_[1] }
-sub event_read { $_[0]->{cb}->() }
-sub event_hup { $_[0]->{cb}->() }
-sub event_err { $_[0]->{cb}->() }
+sub restart_read_cb ($) {
+       my ($self) = @_;
+       sub { $self->watch_read(1) }
+}
+
+sub main_cb ($$$) {
+       my ($http, $fh, $bref) = @_;
+       sub {
+               my ($self) = @_;
+               my $r = sysread($self->{sock}, $$bref, 8192);
+               if ($r) {
+                       $fh->write($$bref);
+                       return if $http->{closed};
+                       if ($http->{write_buf_size}) {
+                               $self->watch_read(0);
+                               $http->write(restart_read_cb($self));
+                       }
+                       # stay in watch_read, but let other clients
+                       # get some work done, too.
+                       return;
+               } elsif (!defined $r) {
+                       return if $!{EAGAIN} || $!{EINTR};
+               }
+
+               # Done! Error handling will happen in $fh->close
+               # called by the {cleanup} handler
+               $http->{forward} = undef;
+               $self->close;
+       }
+}
+
+sub async_pass {
+       my ($self, $http, $fh, $bref) = @_;
+       # In case the client HTTP connection ($http) dies, it
+       # will automatically close this ($self) object.
+       $http->{forward} = $self;
+       $fh->write($$bref); # PublicInbox:HTTP::{chunked,identity}_wcb
+       $self->{cb} = main_cb($http, $fh, $bref);
+}
+
+sub event_read { $_[0]->{cb}->(@_) }
+sub event_hup { $_[0]->{cb}->(@_) }
+sub event_err { $_[0]->{cb}->(@_) }
 sub sysread { shift->{sock}->sysread(@_) }
-sub getline { $_[0]->{sock}->getline };
 
 sub close {
        my $self = shift;
-       $self->{cb} = undef;
+       my $cleanup = $self->{cleanup};
+       $self->{cleanup} = $self->{cb} = undef;
        $self->SUPER::close(@_);
-}
 
-# do not let ourselves be closed during graceful termination
-sub busy () { $_[0]->{cb} }
+       # we defer this to the next timer loop since close is deferred
+       PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+}
 
 1;