]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/NNTP.pm
nntp: NNTPS and NNTP+STARTTLS working
[public-inbox.git] / lib / PublicInbox / NNTP.pm
index 8a31b910be76a317df6838879a5918a32d0c46b2..659e44d5501194499eac24634c81243282bd4441 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2015-2019 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # Each instance of this represents a NNTP client socket
@@ -14,7 +14,7 @@ use PublicInbox::Git;
 require PublicInbox::EvCleanup;
 use Email::Simple;
 use POSIX qw(strftime);
-use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+PublicInbox::DS->import(qw(now msg_more));
 use Digest::SHA qw(sha1_hex);
 use Time::Local qw(timegm timelocal);
 use constant {
@@ -24,8 +24,8 @@ use constant {
        r225 => '225 Headers follow (multi-line)',
        r430 => '430 No article with that message-id',
 };
-
-sub now () { clock_gettime(CLOCK_MONOTONIC) };
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+use Errno qw(EAGAIN);
 
 my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
 my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
@@ -53,21 +53,20 @@ sub next_tick () {
                } else {
                        # pipelined request, we bypassed socket-readiness
                        # checks to get here:
-                       event_read($nntp);
-
-                       # maybe there's more pipelined data, or we'll have
-                       # to register it for socket-readiness notifications
-                       if (!$nntp->{long_res} && !$nntp->{closed}) {
-                               check_read($nntp);
-                       }
+                       event_step($nntp);
                }
        }
 }
 
+sub requeue ($) {
+       push @$nextq, $_[0];
+       $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+}
+
 sub update_idle_time ($) {
        my ($self) = @_;
-       my $fd = $self->{fd};
-       defined $fd and $EXPMAP->{$fd} = [ now(), $self ];
+       my $sock = $self->{sock} or return;
+       $EXPMAP->{fileno($sock)} = [ now(), $self ];
 }
 
 sub expire_old () {
@@ -99,11 +98,20 @@ sub expire_old () {
 sub new ($$$) {
        my ($class, $sock, $nntpd) = @_;
        my $self = fields::new($class);
-       $self->SUPER::new($sock);
+       my $ev = EPOLLOUT | EPOLLONESHOT;
+       my $wbuf = [];
+       if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+               $ev = PublicInbox::TLS::epollbit() or return $sock->close;
+               $ev |= EPOLLONESHOT;
+               $wbuf->[0] = \&PublicInbox::DS::accept_tls_step;
+       }
+       $self->SUPER::new($sock, $ev);
        $self->{nntpd} = $nntpd;
-       res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
+       my $greet = "201 $nntpd->{servername} ready - post via email\r\n";
+       open my $fh, '<:scalar',  \$greet or die "open :scalar: $!";
+       push @$wbuf, $fh;
+       $self->{wbuf} = $wbuf;
        $self->{rbuf} = '';
-       $self->watch_read(1);
        update_idle_time($self);
        $expt ||= PublicInbox::EvCleanup::later(*expire_old);
        $self;
@@ -133,7 +141,7 @@ sub process_line ($$) {
 
        my $res = eval { $req->($self, @args) };
        my $err = $@;
-       if ($err && !$self->{closed}) {
+       if ($err && $self->{sock}) {
                local $/ = "\n";
                chomp($l);
                err($self, 'error from: %s (%s)', $l, $err);
@@ -161,12 +169,12 @@ sub cmd_xgtitle ($;$) {
 
 sub list_overview_fmt ($) {
        my ($self) = @_;
-       do_more($self, $OVERVIEW_FMT);
+       msg_more($self, $OVERVIEW_FMT);
 }
 
 sub list_headers ($;$) {
        my ($self) = @_;
-       do_more($self, $LIST_HEADERS);
+       msg_more($self, $LIST_HEADERS);
 }
 
 sub list_active ($;$) {
@@ -250,7 +258,7 @@ sub parse_time ($$;$) {
        }
        my @now = $gmt ? gmtime : localtime;
        my ($YYYY, $MM, $DD);
-       if (length($date) == 8) { # RFC 3977 allows YYYYMMDD
+       if (bytes::length($date) == 8) { # RFC 3977 allows YYYYMMDD
                ($YYYY, $MM, $DD) = unpack('A4A2A2', $date);
        } else { # legacy clients send YYMMDD
                ($YYYY, $MM, $DD) = unpack('A2A2A2', $date);
@@ -521,8 +529,8 @@ sub simple_body_write ($$) {
        $s->body_set('');
        $body =~ s/^\./../smg;
        $body =~ s/(?<!\r)\n/\r\n/sg;
-       do_more($self, $body);
-       do_more($self, "\r\n") unless $body =~ /\r\n\z/s;
+       msg_more($self, $body);
+       msg_more($self, "\r\n") unless $body =~ /\r\n\z/s;
        '.'
 }
 
@@ -552,8 +560,8 @@ sub cmd_article ($;$) {
        my ($n, $mid, $s) = @$r;
        set_art($self, $art);
        more($self, "220 $n <$mid> article retrieved - head and body follow");
-       do_more($self, _header($s));
-       do_more($self, "\r\n");
+       msg_more($self, _header($s));
+       msg_more($self, "\r\n");
        simple_body_write($self, $s);
 }
 
@@ -564,7 +572,7 @@ sub cmd_head ($;$) {
        my ($n, $mid, $s) = @$r;
        set_art($self, $art);
        more($self, "221 $n <$mid> article retrieved - head follows");
-       do_more($self, _header($s));
+       msg_more($self, _header($s));
        '.'
 }
 
@@ -622,42 +630,39 @@ sub long_response ($$) {
        my ($self, $cb) = @_;
        die "BUG: nested long response" if $self->{long_res};
 
-       my $fd = $self->{fd};
+       my $fd = fileno($self->{sock});
        defined $fd or return;
        # make sure we disable reading during a long response,
        # clients should not be sending us stuff and making us do more
        # work while we are stream a response to them
-       $self->watch_read(0);
        my $t0 = now();
        $self->{long_res} = sub {
                my $more = eval { $cb->() };
-               if ($@ || $self->{closed}) {
-                       $self->{long_res} = undef;
+               if ($@ || !$self->{sock}) { # something bad happened...
+                       delete $self->{long_res};
 
                        if ($@) {
                                err($self,
                                    "%s during long response[$fd] - %0.6f",
                                    $@, now() - $t0);
                        }
-                       if ($self->{closed}) {
+                       if ($self->{sock}) {
+                               update_idle_time($self);
+                               requeue($self);
+                       } else {
                                out($self, " deferred[$fd] aborted - %0.6f",
                                           now() - $t0);
-                       } else {
-                               update_idle_time($self);
-                               check_read($self);
                        }
-               } elsif ($more) { # $self->{write_buf_size}:
+               } elsif ($more) { # $self->{wbuf}:
                        # no recursion, schedule another call ASAP
                        # but only after all pending writes are done
                        update_idle_time($self);
-
-                       push @$nextq, $self;
-                       $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+                       requeue($self);
                } else { # all done!
-                       $self->{long_res} = undef;
-                       check_read($self);
+                       delete $self->{long_res};
                        res($self, '.');
                        out($self, " deferred[$fd] done - %0.6f", now() - $t0);
+                       requeue($self);
                }
        };
        $self->{long_res}->(); # kick off!
@@ -764,7 +769,7 @@ sub hdr_searchmsg ($$$$) {
                                $tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
                        }
                        utf8::encode($tmp);
-                       do_more($self, $tmp);
+                       msg_more($self, $tmp);
                        $cur = $msgs->[-1]->{num} + 1;
                });
        }
@@ -903,6 +908,19 @@ sub cmd_xover ($;$) {
        });
 }
 
+sub cmd_starttls ($) {
+       my ($self) = @_;
+       my $sock = $self->{sock} or return;
+       # RFC 4642 2.2.1
+       (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+       my $opt = $self->{nntpd}->{accept_tls} or
+               return '580 can not initiate TLS negotiation';
+       res($self, '382 Continue with TLS negotiation');
+       $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
+       requeue($self) if PublicInbox::DS::accept_tls_step($self);
+       undef;
+}
+
 sub cmd_xpath ($$) {
        my ($self, $mid) = @_;
        return r501 unless $mid =~ /\A<(.+)>\z/;
@@ -916,24 +934,14 @@ sub cmd_xpath ($$) {
        '223 '.join(' ', @paths);
 }
 
-sub res ($$) {
-       my ($self, $line) = @_;
-       do_write($self, $line . "\r\n");
-}
+sub res ($$) { do_write($_[0], $_[1] . "\r\n") }
 
-sub more ($$) {
-       my ($self, $line) = @_;
-       do_more($self, $line . "\r\n");
-}
+sub more ($$) { msg_more($_[0], $_[1] . "\r\n") }
 
 sub do_write ($$) {
-       my ($self, $data) = @_;
-       my $done = $self->write($data);
-       return 0 if $self->{closed};
-
-       # Do not watch for readability if we have data in the queue,
-       # instead re-enable watching for readability when we can
-       $self->watch_read(0) if (!$done || $self->{long_res});
+       my $self = $_[0];
+       my $done = $self->write(\($_[1]));
+       return 0 unless $self->{sock};
 
        $done;
 }
@@ -948,49 +956,29 @@ sub out ($$;@) {
        printf { $self->{nntpd}->{out} } $fmt."\n", @args;
 }
 
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-
-sub do_more ($$) {
-       my ($self, $data) = @_;
-       if (MSG_MORE && !$self->{write_buf_size}) {
-               my $n = send($self->{sock}, $data, MSG_MORE);
-               if (defined $n) {
-                       my $dlen = length($data);
-                       return 1 if $n == $dlen; # all done!
-                       $data = substr($data, $n, $dlen - $n);
-               }
-       }
-       do_write($self, $data);
-}
-
-# callbacks for PublicInbox::DS
+# callback used by PublicInbox::DS for any (e)poll (in/out/hup/err)
+sub event_step {
+       my ($self) = @_;
 
-sub event_hup { $_[0]->close }
-sub event_err { $_[0]->close }
+       return unless $self->flush_write && $self->{sock};
 
-sub event_write {
-       my ($self) = @_;
        update_idle_time($self);
-       # only continue watching for readability when we are done writing:
-       if ($self->write(undef) == 1 && !$self->{long_res}) {
-               $self->watch_read(1);
-       }
-}
+       # only read more requests if we've drained the write buffer,
+       # otherwise we can be buffering infinitely w/o backpressure
 
-sub event_read {
-       my ($self) = @_;
        use constant LINE_MAX => 512; # RFC 977 section 2.3
+       my $rbuf = \($self->{rbuf});
+       my $r = 1;
 
-       if (index($self->{rbuf}, "\n") < 0) {
-               my $buf = $self->read(LINE_MAX) or return $self->close;
-               $self->{rbuf} .= $$buf;
+       if (index($$rbuf, "\n") < 0) {
+               my $off = bytes::length($$rbuf);
+               $r = $self->do_read($rbuf, LINE_MAX, $off) or return;
        }
-       my $r = 1;
-       while ($r > 0 && $self->{rbuf} =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
+       while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
                my $line = $1;
                return $self->close if $line =~ /[[:cntrl:]]/s;
                my $t0 = now();
-               my $fd = $self->{fd};
+               my $fd = fileno($self->{sock});
                $r = eval { process_line($self, $line) };
                my $d = $self->{long_res} ?
                        " deferred[$fd]" : '';
@@ -998,32 +986,19 @@ sub event_read {
        }
 
        return $self->close if $r < 0;
-       my $len = length($self->{rbuf});
+       my $len = bytes::length($$rbuf);
        return $self->close if ($len >= LINE_MAX);
        update_idle_time($self);
-}
 
-sub check_read {
-       my ($self) = @_;
-       if (index($self->{rbuf}, "\n") >= 0) {
-               # Force another read if there is a pipelined request.
-               # We don't know if the socket has anything for us to read,
-               # and we must double-check again by the time the timer fires
-               # in case we really did dispatch a read event and started
-               # another long response.
-               push @$nextq, $self;
-               $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
-       } else {
-               # no pipelined requests available, let the kernel know
-               # to wake us up if there's more
-               $self->watch_read(1); # PublicInbox::DS::watch_read
-       }
+       # maybe there's more pipelined data, or we'll have
+       # to register it for socket-readiness notifications
+       requeue($self) unless ($self->{long_res} || $self->{wbuf});
 }
 
 sub not_idle_long ($$) {
        my ($self, $now) = @_;
-       defined(my $fd = $self->{fd}) or return;
-       my $ary = $EXPMAP->{$fd} or return;
+       my $sock = $self->{sock} or return;
+       my $ary = $EXPMAP->{fileno($sock)} or return;
        my $exp_at = $ary->[0] + $EXPTIME;
        $exp_at > $now;
 }
@@ -1031,8 +1006,8 @@ sub not_idle_long ($$) {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy {
        my ($self, $now) = @_;
-       ($self->{rbuf} ne '' || $self->{long_res} || $self->{write_buf_size} ||
-        not_idle_long($self, $now));
+       ($self->{rbuf} ne '' || $self->{long_res} ||
+               $self->{wbuf} || not_idle_long($self, $now));
 }
 
 1;