]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/HTTPD/Async.pm
git-http-backend: refactor to support cleanup
[public-inbox.git] / lib / PublicInbox / HTTPD / Async.pm
index ceba738e3ce3f9793c314b16c7a3fe3733b55358..bd2eacbfdfee8a5be8f3e9b423907a9ac16b5851 100644 (file)
@@ -9,29 +9,59 @@ package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(Danga::Socket);
-use fields qw(cb);
+use fields qw(cb cleanup);
 
 sub new {
-       my ($class, $io, $cb) = @_;
+       my ($class, $io, $cb, $cleanup) = @_;
        my $self = fields::new($class);
        IO::Handle::blocking($io, 0);
        $self->SUPER::new($io);
        $self->{cb} = $cb;
+       $self->{cleanup} = $cleanup;
        $self->watch_read(1);
        $self;
 }
 
-sub async_pass { $_[0]->{cb} = $_[1] }
+sub async_pass {
+       my ($self, $io, $fh, $bref) = @_;
+       my $restart_read = sub { $self->watch_read(1) };
+       # In case the client HTTP connection ($io) dies, it
+       # will automatically close this ($self) object.
+       $io->{forward} = $self;
+       $fh->write($$bref);
+       $self->{cb} = sub {
+               my $r = sysread($self->{sock}, $$bref, 8192);
+               if ($r) {
+                       $fh->write($$bref);
+                       if ($io->{write_buf_size}) {
+                               $self->watch_read(0);
+                               $io->write($restart_read); # D::S::write
+                       }
+                       return; # stay in watch_read
+               } elsif (!defined $r) {
+                       return if $!{EAGAIN} || $!{EINTR};
+               }
+
+               # Done! Error handling will happen in $fh->close
+               # called by the {cleanup} handler
+               $io->{forward} = undef;
+               $self->close;
+       }
+}
+
 sub event_read { $_[0]->{cb}->() }
 sub event_hup { $_[0]->{cb}->() }
 sub event_err { $_[0]->{cb}->() }
 sub sysread { shift->{sock}->sysread(@_) }
-sub getline { $_[0]->{sock}->getline };
 
 sub close {
        my $self = shift;
-       $self->{cb} = undef;
+       my $cleanup = $self->{cleanup};
+       $self->{cleanup} = $self->{cb} = undef;
        $self->SUPER::close(@_);
+
+       # we defer this to the next timer loop since close is deferred
+       Danga::Socket->AddTimer(0, $cleanup) if $cleanup;
 }
 
 # do not let ourselves be closed during graceful termination