-# Copyright (C) 2015-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2015-2019 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
#
# Each instance of this represents a NNTP client socket
require PublicInbox::EvCleanup;
use Email::Simple;
use POSIX qw(strftime);
-use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+PublicInbox::DS->import(qw(now msg_more));
use Digest::SHA qw(sha1_hex);
use Time::Local qw(timegm timelocal);
use constant {
r225 => '225 Headers follow (multi-line)',
r430 => '430 No article with that message-id',
};
-
-sub now () { clock_gettime(CLOCK_MONOTONIC) };
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+use Errno qw(EAGAIN);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
} else {
# pipelined request, we bypassed socket-readiness
# checks to get here:
- event_read($nntp);
-
- # maybe there's more pipelined data, or we'll have
- # to register it for socket-readiness notifications
- if (!$nntp->{long_res} && !$nntp->{closed}) {
- check_read($nntp);
- }
+ event_step($nntp);
}
}
}
+sub requeue ($) {
+ push @$nextq, $_[0];
+ $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+}
+
sub update_idle_time ($) {
my ($self) = @_;
- my $fd = $self->{fd};
- defined $fd and $EXPMAP->{$fd} = [ now(), $self ];
+ my $sock = $self->{sock} or return;
+ $EXPMAP->{fileno($sock)} = [ now(), $self ];
}
sub expire_old () {
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock);
+ my $ev = EPOLLOUT | EPOLLONESHOT;
+ my $wbuf = [];
+ if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+ $ev = PublicInbox::TLS::epollbit() or return $sock->close;
+ $ev |= EPOLLONESHOT;
+ $wbuf->[0] = \&PublicInbox::DS::accept_tls_step;
+ }
+ $self->SUPER::new($sock, $ev);
$self->{nntpd} = $nntpd;
- res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
+ my $greet = "201 $nntpd->{servername} ready - post via email\r\n";
+ open my $fh, '<:scalar', \$greet or die "open :scalar: $!";
+ push @$wbuf, $fh;
+ $self->{wbuf} = $wbuf;
$self->{rbuf} = '';
- $self->watch_read(1);
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
$self;
my $res = eval { $req->($self, @args) };
my $err = $@;
- if ($err && !$self->{closed}) {
+ if ($err && $self->{sock}) {
local $/ = "\n";
chomp($l);
err($self, 'error from: %s (%s)', $l, $err);
sub list_overview_fmt ($) {
my ($self) = @_;
- do_more($self, $OVERVIEW_FMT);
+ msg_more($self, $OVERVIEW_FMT);
}
sub list_headers ($;$) {
my ($self) = @_;
- do_more($self, $LIST_HEADERS);
+ msg_more($self, $LIST_HEADERS);
}
sub list_active ($;$) {
}
my @now = $gmt ? gmtime : localtime;
my ($YYYY, $MM, $DD);
- if (length($date) == 8) { # RFC 3977 allows YYYYMMDD
+ if (bytes::length($date) == 8) { # RFC 3977 allows YYYYMMDD
($YYYY, $MM, $DD) = unpack('A4A2A2', $date);
} else { # legacy clients send YYMMDD
($YYYY, $MM, $DD) = unpack('A2A2A2', $date);
# reason. We'll fake the shortest one possible.
$hdr->header_set('Path', 'y');
+ # leafnode (and maybe other NNTP clients) have trouble dealing
+ # with v2 messages which have multiple Message-IDs (either due
+ # to our own content-based dedupe or buggy git-send-email versions).
+ my @mids = $hdr->header('Message-ID');
+ if (scalar(@mids) > 1) {
+ my $mid0 = "<$mid>";
+ $hdr->header_set('Message-ID', $mid0);
+ my @alt = $hdr->header('X-Alt-Message-ID');
+ my %seen = map { $_ => 1 } (@alt, $mid0);
+ foreach my $m (@mids) {
+ next if $seen{$m}++;
+ push @alt, $m;
+ }
+ $hdr->header_set('X-Alt-Message-ID', @alt);
+ }
+
# clobber some
my $xref = xref($self, $ng, $n, $mid);
$hdr->header_set('Xref', $xref);
$s->body_set('');
$body =~ s/^\./../smg;
$body =~ s/(?<!\r)\n/\r\n/sg;
- do_more($self, $body);
- do_more($self, "\r\n") unless $body =~ /\r\n\z/s;
+ msg_more($self, $body);
+ msg_more($self, "\r\n") unless $body =~ /\r\n\z/s;
'.'
}
my $hdr = $_[0]->header_obj->as_string;
utf8::encode($hdr);
$hdr =~ s/(?<!\r)\n/\r\n/sg;
+
+ # for leafnode compatibility, we need to ensure Message-ID headers
+ # are only a single line. We can't subclass Email::Simple::Header
+ # and override _default_fold_at in here, either; since that won't
+ # affect messages already in the archive.
+ $hdr =~ s/^(Message-ID:)[ \t]*\r\n[ \t]+([^\r]+)\r\n/$1 $2\r\n/igsm;
+
$hdr
}
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "220 $n <$mid> article retrieved - head and body follow");
- do_more($self, _header($s));
- do_more($self, "\r\n");
+ msg_more($self, _header($s));
+ msg_more($self, "\r\n");
simple_body_write($self, $s);
}
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "221 $n <$mid> article retrieved - head follows");
- do_more($self, _header($s));
+ msg_more($self, _header($s));
'.'
}
my ($self, $cb) = @_;
die "BUG: nested long response" if $self->{long_res};
- my $fd = $self->{fd};
+ my $fd = fileno($self->{sock});
defined $fd or return;
# make sure we disable reading during a long response,
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
- $self->watch_read(0);
my $t0 = now();
$self->{long_res} = sub {
my $more = eval { $cb->() };
- if ($@ || $self->{closed}) {
- $self->{long_res} = undef;
+ if ($@ || !$self->{sock}) { # something bad happened...
+ delete $self->{long_res};
if ($@) {
err($self,
"%s during long response[$fd] - %0.6f",
$@, now() - $t0);
}
- if ($self->{closed}) {
+ if ($self->{sock}) {
+ update_idle_time($self);
+ requeue($self);
+ } else {
out($self, " deferred[$fd] aborted - %0.6f",
now() - $t0);
- } else {
- update_idle_time($self);
- check_read($self);
}
- } elsif ($more) { # $self->{write_buf_size}:
+ } elsif ($more) { # $self->{wbuf}:
# no recursion, schedule another call ASAP
# but only after all pending writes are done
update_idle_time($self);
-
- push @$nextq, $self;
- $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+ requeue($self);
} else { # all done!
- $self->{long_res} = undef;
- check_read($self);
+ delete $self->{long_res};
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
+ requeue($self);
}
};
$self->{long_res}->(); # kick off!
$tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
}
utf8::encode($tmp);
- do_more($self, $tmp);
+ msg_more($self, $tmp);
$cur = $msgs->[-1]->{num} + 1;
});
}
});
}
+sub cmd_starttls ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ # RFC 4642 2.2.1
+ (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+ my $opt = $self->{nntpd}->{accept_tls} or
+ return '580 can not initiate TLS negotiation';
+ res($self, '382 Continue with TLS negotiation');
+ $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
+ requeue($self) if PublicInbox::DS::accept_tls_step($self);
+ undef;
+}
+
sub cmd_xpath ($$) {
my ($self, $mid) = @_;
return r501 unless $mid =~ /\A<(.+)>\z/;
'223 '.join(' ', @paths);
}
-sub res ($$) {
- my ($self, $line) = @_;
- do_write($self, $line . "\r\n");
-}
+sub res ($$) { do_write($_[0], $_[1] . "\r\n") }
-sub more ($$) {
- my ($self, $line) = @_;
- do_more($self, $line . "\r\n");
-}
+sub more ($$) { msg_more($_[0], $_[1] . "\r\n") }
sub do_write ($$) {
- my ($self, $data) = @_;
- my $done = $self->write($data);
- return 0 if $self->{closed};
-
- # Do not watch for readability if we have data in the queue,
- # instead re-enable watching for readability when we can
- $self->watch_read(0) if (!$done || $self->{long_res});
+ my $self = $_[0];
+ my $done = $self->write(\($_[1]));
+ return 0 unless $self->{sock};
$done;
}
printf { $self->{nntpd}->{out} } $fmt."\n", @args;
}
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-
-sub do_more ($$) {
- my ($self, $data) = @_;
- if (MSG_MORE && !$self->{write_buf_size}) {
- my $n = send($self->{sock}, $data, MSG_MORE);
- if (defined $n) {
- my $dlen = length($data);
- return 1 if $n == $dlen; # all done!
- $data = substr($data, $n, $dlen - $n);
- }
- }
- do_write($self, $data);
-}
-
-# callbacks for PublicInbox::DS
+# callback used by PublicInbox::DS for any (e)poll (in/out/hup/err)
+sub event_step {
+ my ($self) = @_;
-sub event_hup { $_[0]->close }
-sub event_err { $_[0]->close }
+ return unless $self->flush_write && $self->{sock};
-sub event_write {
- my ($self) = @_;
update_idle_time($self);
- # only continue watching for readability when we are done writing:
- if ($self->write(undef) == 1 && !$self->{long_res}) {
- $self->watch_read(1);
- }
-}
+ # only read more requests if we've drained the write buffer,
+ # otherwise we can be buffering infinitely w/o backpressure
-sub event_read {
- my ($self) = @_;
use constant LINE_MAX => 512; # RFC 977 section 2.3
+ my $rbuf = \($self->{rbuf});
+ my $r = 1;
- if (index($self->{rbuf}, "\n") < 0) {
- my $buf = $self->read(LINE_MAX) or return $self->close;
- $self->{rbuf} .= $$buf;
+ if (index($$rbuf, "\n") < 0) {
+ my $off = bytes::length($$rbuf);
+ $r = $self->do_read($rbuf, LINE_MAX, $off) or return;
}
- my $r = 1;
- while ($r > 0 && $self->{rbuf} =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
+ while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
my $line = $1;
return $self->close if $line =~ /[[:cntrl:]]/s;
my $t0 = now();
- my $fd = $self->{fd};
+ my $fd = fileno($self->{sock});
$r = eval { process_line($self, $line) };
my $d = $self->{long_res} ?
" deferred[$fd]" : '';
}
return $self->close if $r < 0;
- my $len = length($self->{rbuf});
+ my $len = bytes::length($$rbuf);
return $self->close if ($len >= LINE_MAX);
update_idle_time($self);
-}
-sub check_read {
- my ($self) = @_;
- if (index($self->{rbuf}, "\n") >= 0) {
- # Force another read if there is a pipelined request.
- # We don't know if the socket has anything for us to read,
- # and we must double-check again by the time the timer fires
- # in case we really did dispatch a read event and started
- # another long response.
- push @$nextq, $self;
- $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
- } else {
- # no pipelined requests available, let the kernel know
- # to wake us up if there's more
- $self->watch_read(1); # PublicInbox::DS::watch_read
- }
+ # maybe there's more pipelined data, or we'll have
+ # to register it for socket-readiness notifications
+ requeue($self) unless ($self->{long_res} || $self->{wbuf});
}
sub not_idle_long ($$) {
my ($self, $now) = @_;
- defined(my $fd = $self->{fd}) or return;
- my $ary = $EXPMAP->{$fd} or return;
+ my $sock = $self->{sock} or return;
+ my $ary = $EXPMAP->{fileno($sock)} or return;
my $exp_at = $ary->[0] + $EXPTIME;
$exp_at > $now;
}
# for graceful shutdown in PublicInbox::Daemon:
sub busy {
my ($self, $now) = @_;
- ($self->{rbuf} ne '' || $self->{long_res} || $self->{write_buf_size} ||
- not_idle_long($self, $now));
+ ($self->{rbuf} ne '' || $self->{long_res} ||
+ $self->{wbuf} || not_idle_long($self, $now));
}
1;