]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/NNTP.pm
Merge remote-tracking branch 'origin/nntp-tls'
[public-inbox.git] / lib / PublicInbox / NNTP.pm
index 659e44d5501194499eac24634c81243282bd4441..53e18281ce412d32d1eb120ccf990b6163d8c625 100644 (file)
@@ -6,7 +6,7 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
 use PublicInbox::MID qw(mid_escape);
@@ -24,7 +24,7 @@ use constant {
        r225 => '225 Headers follow (multi-line)',
        r430 => '430 No article with that message-id',
 };
-use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
 use Errno qw(EAGAIN);
 
 my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
@@ -45,17 +45,7 @@ sub next_tick () {
        $nextt = undef;
        my $q = $nextq;
        $nextq = [];
-       foreach my $nntp (@$q) {
-               # for request && response protocols, always finish writing
-               # before finishing reading:
-               if (my $long_cb = $nntp->{long_res}) {
-                       $nntp->write($long_cb);
-               } else {
-                       # pipelined request, we bypassed socket-readiness
-                       # checks to get here:
-                       event_step($nntp);
-               }
-       }
+       event_step($_) for @$q;
 }
 
 sub requeue ($) {
@@ -74,11 +64,17 @@ sub expire_old () {
        my $exp = $EXPTIME;
        my $old = $now - $exp;
        my $nr = 0;
+       my $closed = 0;
        my %new;
        while (my ($fd, $v) = each %$EXPMAP) {
                my ($idle_time, $nntp) = @$v;
                if ($idle_time < $old) {
-                       $nntp->close; # idempotent
+                       if ($nntp->shutdn) {
+                               $closed++;
+                       } else {
+                               ++$nr;
+                               $new{$fd} = $v;
+                       }
                } else {
                        ++$nr;
                        $new{$fd} = $v;
@@ -91,27 +87,28 @@ sub expire_old () {
                $expt = undef;
                # noop to kick outselves out of the loop ASAP so descriptors
                # really get closed
-               PublicInbox::EvCleanup::asap(sub {});
+               PublicInbox::EvCleanup::asap(sub {}) if $closed;
        }
 }
 
+sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
+
 sub new ($$$) {
        my ($class, $sock, $nntpd) = @_;
        my $self = fields::new($class);
-       my $ev = EPOLLOUT | EPOLLONESHOT;
-       my $wbuf = [];
+       my $ev = EPOLLIN;
+       my $wbuf;
        if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
-               $ev = PublicInbox::TLS::epollbit() or return $sock->close;
-               $ev |= EPOLLONESHOT;
-               $wbuf->[0] = \&PublicInbox::DS::accept_tls_step;
+               $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
+               $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
        }
-       $self->SUPER::new($sock, $ev);
+       $self->SUPER::new($sock, $ev | EPOLLONESHOT);
        $self->{nntpd} = $nntpd;
-       my $greet = "201 $nntpd->{servername} ready - post via email\r\n";
-       open my $fh, '<:scalar',  \$greet or die "open :scalar: $!";
-       push @$wbuf, $fh;
-       $self->{wbuf} = $wbuf;
-       $self->{rbuf} = '';
+       if ($wbuf) {
+               $self->{wbuf} = $wbuf;
+       } else {
+               greet($self);
+       }
        update_idle_time($self);
        $expt ||= PublicInbox::EvCleanup::later(*expire_old);
        $self;
@@ -410,7 +407,7 @@ sub cmd_post ($) {
 sub cmd_quit ($) {
        my ($self) = @_;
        res($self, '205 closing connection - goodbye!');
-       $self->close;
+       $self->shutdn;
        undef;
 }
 
@@ -627,8 +624,7 @@ sub get_range ($$) {
 }
 
 sub long_response ($$) {
-       my ($self, $cb) = @_;
-       die "BUG: nested long response" if $self->{long_res};
+       my ($self, $cb) = @_; # cb returns true if more, false if done
 
        my $fd = fileno($self->{sock});
        defined $fd or return;
@@ -636,36 +632,38 @@ sub long_response ($$) {
        # clients should not be sending us stuff and making us do more
        # work while we are stream a response to them
        my $t0 = now();
-       $self->{long_res} = sub {
+       my $long_cb; # DANGER: self-referential
+       $long_cb = sub {
+               # wbuf is unset or empty, here; $cb may add to it
                my $more = eval { $cb->() };
                if ($@ || !$self->{sock}) { # something bad happened...
-                       delete $self->{long_res};
-
+                       $long_cb = undef;
+                       my $diff = now() - $t0;
                        if ($@) {
                                err($self,
                                    "%s during long response[$fd] - %0.6f",
-                                   $@, now() - $t0);
-                       }
-                       if ($self->{sock}) {
-                               update_idle_time($self);
-                               requeue($self);
-                       } else {
-                               out($self, " deferred[$fd] aborted - %0.6f",
-                                          now() - $t0);
+                                   $@, $diff);
                        }
+                       out($self, " deferred[$fd] aborted - %0.6f", $diff);
+                       $self->close;
                } elsif ($more) { # $self->{wbuf}:
+                       update_idle_time($self);
+
                        # no recursion, schedule another call ASAP
                        # but only after all pending writes are done
-                       update_idle_time($self);
-                       requeue($self);
+                       my $wbuf = $self->{wbuf} ||= [];
+                       push @$wbuf, $long_cb;
+
+                       # wbuf may be populated by $cb, no need to rearm if so:
+                       requeue($self) if scalar(@$wbuf) == 1;
                } else { # all done!
-                       delete $self->{long_res};
+                       $long_cb = undef;
                        res($self, '.');
                        out($self, " deferred[$fd] done - %0.6f", now() - $t0);
-                       requeue($self);
+                       requeue($self) unless $self->{wbuf};
                }
        };
-       $self->{long_res}->(); # kick off!
+       $self->write($long_cb); # kick off!
        undef;
 }
 
@@ -967,7 +965,7 @@ sub event_step {
        # otherwise we can be buffering infinitely w/o backpressure
 
        use constant LINE_MAX => 512; # RFC 977 section 2.3
-       my $rbuf = \($self->{rbuf});
+       my $rbuf = $self->{rbuf} // (\(my $x = ''));
        my $r = 1;
 
        if (index($$rbuf, "\n") < 0) {
@@ -980,19 +978,23 @@ sub event_step {
                my $t0 = now();
                my $fd = fileno($self->{sock});
                $r = eval { process_line($self, $line) };
-               my $d = $self->{long_res} ?
-                       " deferred[$fd]" : '';
-               out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+               my $pending = $self->{wbuf} ? ' pending' : '';
+               out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
        }
 
        return $self->close if $r < 0;
        my $len = bytes::length($$rbuf);
        return $self->close if ($len >= LINE_MAX);
+       if ($len) {
+               $self->{rbuf} = $rbuf;
+       } else {
+               delete $self->{rbuf};
+       }
        update_idle_time($self);
 
        # maybe there's more pipelined data, or we'll have
        # to register it for socket-readiness notifications
-       requeue($self) unless ($self->{long_res} || $self->{wbuf});
+       requeue($self) unless $self->{wbuf};
 }
 
 sub not_idle_long ($$) {
@@ -1006,8 +1008,7 @@ sub not_idle_long ($$) {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy {
        my ($self, $now) = @_;
-       ($self->{rbuf} ne '' || $self->{long_res} ||
-               $self->{wbuf} || not_idle_long($self, $now));
+       ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
 }
 
 1;