]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/NNTP.pm
Merge remote-tracking branch 'origin/nntp-tls'
[public-inbox.git] / lib / PublicInbox / NNTP.pm
index 12ce4e687f574e655e9201fda255552f7241ebb1..53e18281ce412d32d1eb120ccf990b6163d8c625 100644 (file)
@@ -6,7 +6,7 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
 use PublicInbox::MID qw(mid_escape);
@@ -24,7 +24,7 @@ use constant {
        r225 => '225 Headers follow (multi-line)',
        r430 => '430 No article with that message-id',
 };
-use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
 use Errno qw(EAGAIN);
 
 my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
@@ -45,17 +45,7 @@ sub next_tick () {
        $nextt = undef;
        my $q = $nextq;
        $nextq = [];
-       foreach my $nntp (@$q) {
-               # for request && response protocols, always finish writing
-               # before finishing reading:
-               if (my $long_cb = $nntp->{long_res}) {
-                       $nntp->write($long_cb);
-               } else {
-                       # pipelined request, we bypassed socket-readiness
-                       # checks to get here:
-                       event_step($nntp);
-               }
-       }
+       event_step($_) for @$q;
 }
 
 sub requeue ($) {
@@ -106,18 +96,19 @@ sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
 sub new ($$$) {
        my ($class, $sock, $nntpd) = @_;
        my $self = fields::new($class);
-       my $ev = EPOLLOUT | EPOLLONESHOT;
-       my $wbuf = [];
+       my $ev = EPOLLIN;
+       my $wbuf;
        if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
                $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
-               $ev |= EPOLLONESHOT;
-               $wbuf->[0] = \&PublicInbox::DS::accept_tls_step;
+               $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
        }
-       $self->SUPER::new($sock, $ev);
+       $self->SUPER::new($sock, $ev | EPOLLONESHOT);
        $self->{nntpd} = $nntpd;
-       push @$wbuf, \&greet;
-       $self->{wbuf} = $wbuf;
-       $self->{rbuf} = '';
+       if ($wbuf) {
+               $self->{wbuf} = $wbuf;
+       } else {
+               greet($self);
+       }
        update_idle_time($self);
        $expt ||= PublicInbox::EvCleanup::later(*expire_old);
        $self;
@@ -633,8 +624,7 @@ sub get_range ($$) {
 }
 
 sub long_response ($$) {
-       my ($self, $cb) = @_;
-       die "BUG: nested long response" if $self->{long_res};
+       my ($self, $cb) = @_; # cb returns true if more, false if done
 
        my $fd = fileno($self->{sock});
        defined $fd or return;
@@ -642,36 +632,38 @@ sub long_response ($$) {
        # clients should not be sending us stuff and making us do more
        # work while we are stream a response to them
        my $t0 = now();
-       $self->{long_res} = sub {
+       my $long_cb; # DANGER: self-referential
+       $long_cb = sub {
+               # wbuf is unset or empty, here; $cb may add to it
                my $more = eval { $cb->() };
                if ($@ || !$self->{sock}) { # something bad happened...
-                       delete $self->{long_res};
-
+                       $long_cb = undef;
+                       my $diff = now() - $t0;
                        if ($@) {
                                err($self,
                                    "%s during long response[$fd] - %0.6f",
-                                   $@, now() - $t0);
-                       }
-                       if ($self->{sock}) {
-                               update_idle_time($self);
-                               requeue($self);
-                       } else {
-                               out($self, " deferred[$fd] aborted - %0.6f",
-                                          now() - $t0);
+                                   $@, $diff);
                        }
+                       out($self, " deferred[$fd] aborted - %0.6f", $diff);
+                       $self->close;
                } elsif ($more) { # $self->{wbuf}:
+                       update_idle_time($self);
+
                        # no recursion, schedule another call ASAP
                        # but only after all pending writes are done
-                       update_idle_time($self);
-                       requeue($self);
+                       my $wbuf = $self->{wbuf} ||= [];
+                       push @$wbuf, $long_cb;
+
+                       # wbuf may be populated by $cb, no need to rearm if so:
+                       requeue($self) if scalar(@$wbuf) == 1;
                } else { # all done!
-                       delete $self->{long_res};
+                       $long_cb = undef;
                        res($self, '.');
                        out($self, " deferred[$fd] done - %0.6f", now() - $t0);
-                       requeue($self);
+                       requeue($self) unless $self->{wbuf};
                }
        };
-       $self->{long_res}->(); # kick off!
+       $self->write($long_cb); # kick off!
        undef;
 }
 
@@ -973,7 +965,7 @@ sub event_step {
        # otherwise we can be buffering infinitely w/o backpressure
 
        use constant LINE_MAX => 512; # RFC 977 section 2.3
-       my $rbuf = \($self->{rbuf});
+       my $rbuf = $self->{rbuf} // (\(my $x = ''));
        my $r = 1;
 
        if (index($$rbuf, "\n") < 0) {
@@ -986,19 +978,23 @@ sub event_step {
                my $t0 = now();
                my $fd = fileno($self->{sock});
                $r = eval { process_line($self, $line) };
-               my $d = $self->{long_res} ?
-                       " deferred[$fd]" : '';
-               out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+               my $pending = $self->{wbuf} ? ' pending' : '';
+               out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
        }
 
        return $self->close if $r < 0;
        my $len = bytes::length($$rbuf);
        return $self->close if ($len >= LINE_MAX);
+       if ($len) {
+               $self->{rbuf} = $rbuf;
+       } else {
+               delete $self->{rbuf};
+       }
        update_idle_time($self);
 
        # maybe there's more pipelined data, or we'll have
        # to register it for socket-readiness notifications
-       requeue($self) unless ($self->{long_res} || $self->{wbuf});
+       requeue($self) unless $self->{wbuf};
 }
 
 sub not_idle_long ($$) {
@@ -1012,8 +1008,7 @@ sub not_idle_long ($$) {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy {
        my ($self, $now) = @_;
-       ($self->{rbuf} ne '' || $self->{long_res} ||
-               $self->{wbuf} || not_idle_long($self, $now));
+       ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
 }
 
 1;