-# Copyright (C) 2015-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2015-2019 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
#
# Each instance of this represents a NNTP client socket
require PublicInbox::EvCleanup;
use Email::Simple;
use POSIX qw(strftime);
-PublicInbox::DS->import('now');
+PublicInbox::DS->import(qw(now msg_more));
use Digest::SHA qw(sha1_hex);
use Time::Local qw(timegm timelocal);
use constant {
r225 => '225 Headers follow (multi-line)',
r430 => '430 No article with that message-id',
};
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+use Errno qw(EAGAIN);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
# pipelined request, we bypassed socket-readiness
# checks to get here:
event_step($nntp);
-
- # maybe there's more pipelined data, or we'll have
- # to register it for socket-readiness notifications
- if (!$nntp->{long_res} && $nntp->{sock}) {
- check_read($nntp);
- }
}
}
}
+sub requeue ($) {
+ push @$nextq, $_[0];
+ $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+}
+
sub update_idle_time ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
my $exp = $EXPTIME;
my $old = $now - $exp;
my $nr = 0;
+ my $closed = 0;
my %new;
while (my ($fd, $v) = each %$EXPMAP) {
my ($idle_time, $nntp) = @$v;
if ($idle_time < $old) {
- $nntp->close; # idempotent
+ if ($nntp->shutdn) {
+ $closed++;
+ } else {
+ ++$nr;
+ $new{$fd} = $v;
+ }
} else {
++$nr;
$new{$fd} = $v;
$expt = undef;
# noop to kick outselves out of the loop ASAP so descriptors
# really get closed
- PublicInbox::EvCleanup::asap(sub {});
+ PublicInbox::EvCleanup::asap(sub {}) if $closed;
}
}
+sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
+
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock);
+ my $ev = EPOLLOUT | EPOLLONESHOT;
+ my $wbuf = [];
+ if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+ $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
+ $ev |= EPOLLONESHOT;
+ $wbuf->[0] = \&PublicInbox::DS::accept_tls_step;
+ }
+ $self->SUPER::new($sock, $ev);
$self->{nntpd} = $nntpd;
- res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
+ push @$wbuf, \&greet;
+ $self->{wbuf} = $wbuf;
$self->{rbuf} = '';
- $self->watch_read(1);
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
$self;
sub list_overview_fmt ($) {
my ($self) = @_;
- do_more($self, $OVERVIEW_FMT);
+ msg_more($self, $OVERVIEW_FMT);
}
sub list_headers ($;$) {
my ($self) = @_;
- do_more($self, $LIST_HEADERS);
+ msg_more($self, $LIST_HEADERS);
}
sub list_active ($;$) {
}
my @now = $gmt ? gmtime : localtime;
my ($YYYY, $MM, $DD);
- if (length($date) == 8) { # RFC 3977 allows YYYYMMDD
+ if (bytes::length($date) == 8) { # RFC 3977 allows YYYYMMDD
($YYYY, $MM, $DD) = unpack('A4A2A2', $date);
} else { # legacy clients send YYMMDD
($YYYY, $MM, $DD) = unpack('A2A2A2', $date);
sub cmd_quit ($) {
my ($self) = @_;
res($self, '205 closing connection - goodbye!');
- $self->close;
+ $self->shutdn;
undef;
}
$s->body_set('');
$body =~ s/^\./../smg;
$body =~ s/(?<!\r)\n/\r\n/sg;
- do_more($self, $body);
- do_more($self, "\r\n") unless $body =~ /\r\n\z/s;
+ msg_more($self, $body);
+ msg_more($self, "\r\n") unless $body =~ /\r\n\z/s;
'.'
}
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "220 $n <$mid> article retrieved - head and body follow");
- do_more($self, _header($s));
- do_more($self, "\r\n");
+ msg_more($self, _header($s));
+ msg_more($self, "\r\n");
simple_body_write($self, $s);
}
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "221 $n <$mid> article retrieved - head follows");
- do_more($self, _header($s));
+ msg_more($self, _header($s));
'.'
}
# make sure we disable reading during a long response,
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
- $self->watch_read(0);
my $t0 = now();
$self->{long_res} = sub {
my $more = eval { $cb->() };
- if ($@ || !$self->{sock}) {
- $self->{long_res} = undef;
+ if ($@ || !$self->{sock}) { # something bad happened...
+ delete $self->{long_res};
if ($@) {
err($self,
}
if ($self->{sock}) {
update_idle_time($self);
- check_read($self);
+ requeue($self);
} else {
out($self, " deferred[$fd] aborted - %0.6f",
now() - $t0);
# no recursion, schedule another call ASAP
# but only after all pending writes are done
update_idle_time($self);
-
- push @$nextq, $self;
- $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+ requeue($self);
} else { # all done!
- $self->{long_res} = undef;
- check_read($self);
+ delete $self->{long_res};
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
+ requeue($self);
}
};
$self->{long_res}->(); # kick off!
$tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
}
utf8::encode($tmp);
- do_more($self, $tmp);
+ msg_more($self, $tmp);
$cur = $msgs->[-1]->{num} + 1;
});
}
});
}
+sub cmd_starttls ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ # RFC 4642 2.2.1
+ (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+ my $opt = $self->{nntpd}->{accept_tls} or
+ return '580 can not initiate TLS negotiation';
+ res($self, '382 Continue with TLS negotiation');
+ $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
+ requeue($self) if PublicInbox::DS::accept_tls_step($self);
+ undef;
+}
+
sub cmd_xpath ($$) {
my ($self, $mid) = @_;
return r501 unless $mid =~ /\A<(.+)>\z/;
'223 '.join(' ', @paths);
}
-sub res ($$) {
- my ($self, $line) = @_;
- do_write($self, $line . "\r\n");
-}
+sub res ($$) { do_write($_[0], $_[1] . "\r\n") }
-sub more ($$) {
- my ($self, $line) = @_;
- do_more($self, $line . "\r\n");
-}
+sub more ($$) { msg_more($_[0], $_[1] . "\r\n") }
sub do_write ($$) {
- my ($self, $data) = @_;
- my $done = $self->write($data);
+ my $self = $_[0];
+ my $done = $self->write(\($_[1]));
return 0 unless $self->{sock};
- # Do not watch for readability if we have data in the queue,
- # instead re-enable watching for readability when we can
- $self->watch_read(0) if (!$done || $self->{long_res});
-
$done;
}
printf { $self->{nntpd}->{out} } $fmt."\n", @args;
}
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-
-sub do_more ($$) {
- my ($self, $data) = @_;
- if (MSG_MORE && !$self->{wbuf}) {
- my $n = send($self->{sock}, $data, MSG_MORE);
- if (defined $n) {
- my $dlen = length($data);
- return 1 if $n == $dlen; # all done!
- $data = substr($data, $n, $dlen - $n);
- }
- }
- do_write($self, $data);
-}
-
+# callback used by PublicInbox::DS for any (e)poll (in/out/hup/err)
sub event_step {
my ($self) = @_;
return unless $self->flush_write && $self->{sock};
- return if $self->{long_res};
update_idle_time($self);
# only read more requests if we've drained the write buffer,
use constant LINE_MAX => 512; # RFC 977 section 2.3
my $rbuf = \($self->{rbuf});
- my $r;
+ my $r = 1;
if (index($$rbuf, "\n") < 0) {
- my $off = length($$rbuf);
- $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
- unless (defined $r) {
- return if $!{EAGAIN};
- return $self->close;
- }
- return $self->close if $r == 0;
+ my $off = bytes::length($$rbuf);
+ $r = $self->do_read($rbuf, LINE_MAX, $off) or return;
}
- $r = 1;
while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
my $line = $1;
return $self->close if $line =~ /[[:cntrl:]]/s;
}
return $self->close if $r < 0;
- my $len = length($$rbuf);
+ my $len = bytes::length($$rbuf);
return $self->close if ($len >= LINE_MAX);
update_idle_time($self);
-}
-sub check_read {
- my ($self) = @_;
- if (index($self->{rbuf}, "\n") >= 0) {
- # Force another read if there is a pipelined request.
- # We don't know if the socket has anything for us to read,
- # and we must double-check again by the time the timer fires
- # in case we really did dispatch a read event and started
- # another long response.
- push @$nextq, $self;
- $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
- } else {
- # no pipelined requests available, let the kernel know
- # to wake us up if there's more
- $self->watch_read(1); # PublicInbox::DS::watch_read
- }
+ # maybe there's more pipelined data, or we'll have
+ # to register it for socket-readiness notifications
+ requeue($self) unless ($self->{long_res} || $self->{wbuf});
}
sub not_idle_long ($$) {