use constant {
LINE_MAX => 512, # RFC 977 section 2.3
r501 => '501 command syntax error',
+ r502 => '502 Command unavailable',
r221 => '221 Header follows',
r224 => '224 Overview information follows (multi-line)',
r225 => '225 Headers follow (multi-line)',
HDR\r
OVER\r
+my $have_deflate;
my $EXPMAP; # fd -> [ idle_time, $self ]
my $expt;
our $EXPTIME = 180; # 3 minutes
});
}
+sub compressed { undef }
+
sub cmd_starttls ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
# RFC 4642 2.2.1
- (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+ return r502 if (ref($sock) eq 'IO::Socket::SSL' || $self->compressed);
my $opt = $self->{nntpd}->{accept_tls} or
return '580 can not initiate TLS negotiation';
res($self, '382 Continue with TLS negotiation');
undef;
}
+# RFC 8054
+sub cmd_compress ($$) {
+ my ($self, $alg) = @_;
+ return '503 Only the DEFLATE is supported' if uc($alg) ne 'DEFLATE';
+ return r502 if $self->compressed || !$have_deflate;
+ res($self, '206 Compression active');
+ PublicInbox::NNTPdeflate->enable($self);
+ $self->requeue;
+ undef
+}
+
sub cmd_xpath ($$) {
my ($self, $mid) = @_;
return r501 unless $mid =~ /\A<(.+)>\z/;
($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
}
+# this is an import to prevent "perl -c" from complaining about fields
+sub import {
+ $have_deflate = eval { require PublicInbox::NNTPdeflate } and
+ $CAPABILITIES .= "COMPRESS DEFLATE\r\n";
+}
+
1;
--- /dev/null
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# RFC 8054 NNTP COMPRESS DEFLATE implementation
+# Warning, enabling compression for C10K NNTP clients is rather
+# expensive in terms of memory use.
+#
+# RSS usage for 10K idle-but-did-something NNTP clients on 64-bit:
+# TLS + DEFLATE : 1.8 GB (MemLevel=9, 1.2 GB with MemLevel=8)
+# TLS only : <200MB
+# plain : <50MB
+package PublicInbox::NNTPdeflate;
+use strict;
+use warnings;
+use 5.010_001;
+use base qw(PublicInbox::NNTP);
+use Compress::Raw::Zlib;
+use Hash::Util qw(unlock_hash); # dependency of fields for perl 5.10+, anyways
+
+my %IN_OPT = (
+ -Bufsize => PublicInbox::NNTP::LINE_MAX,
+ -WindowBits => -15, # RFC 1951
+ -AppendOutput => 1,
+);
+
+my %OUT_OPT = (
+ # nnrpd (INN) and Compress::Raw::Zlib favor MemLevel=9,
+ # but the zlib C library and git use MemLevel=8
+ # as the default. Using 8 drops our memory use with 10K
+ # TLS clients from 1.8 GB to 1.2 GB, but...
+ # FIXME: sometimes clients fail with 8, so we use 9
+ # -MemLevel => 9,
+
+ # needs more testing, nothing obviously different in terms of memory
+ -Bufsize => 65536,
+
+ -WindowBits => -15, # RFC 1951
+ -AppendOutput => 1,
+);
+
+sub enable {
+ my ($class, $self) = @_;
+ unlock_hash(%$self);
+ bless $self, $class;
+ $self->{zin} = [ Compress::Raw::Zlib::Inflate->new(%IN_OPT), '' ];
+ $self->{zout} = [ Compress::Raw::Zlib::Deflate->new(%OUT_OPT), '' ];
+}
+
+# overrides PublicInbox::NNTP::compressed
+sub compressed { 1 }
+
+# SUPER is PublicInbox::DS::do_read, so $_[1] may be a reference or not
+sub do_read ($$$$) {
+ my ($self, $rbuf, $len, $off) = @_;
+
+ my $zin = $self->{zin} or return; # closed
+ my $deflated = \($zin->[1]);
+ my $r = $self->SUPER::do_read($deflated, $len) or return;
+
+ # assert(length($$rbuf) == $off) as far as NNTP.pm is concerned
+ # -ConsumeInput is true, so $deflated is automatically emptied
+ my $err = $zin->[0]->inflate($deflated, $rbuf);
+ if ($err == Z_OK) {
+ $r = length($$rbuf) and return $r;
+ # nothing ready, yet, get more, later
+ $self->requeue;
+ } else {
+ delete $self->{zin};
+ $self->close;
+ }
+ 0;
+}
+
+# override PublicInbox::DS::msg_more
+sub msg_more ($$) {
+ my $self = $_[0];
+ my $zout = $self->{zout};
+
+ # $_[1] may be a reference or not for ->deflate
+ my $err = $zout->[0]->deflate($_[1], $zout->[1]);
+ $err == Z_OK or die "->deflate failed $err";
+ 1;
+}
+
+# SUPER is PublicInbox::DS::write, so $_[1] may be a reference or not
+sub write ($$) {
+ my $self = $_[0];
+ return $self->SUPER::write($_[1]) if ref($_[1]) eq 'CODE';
+ my $zout = $self->{zout};
+ my $deflated = pop @$zout;
+
+ # $_[1] may be a reference or not for ->deflate
+ my $err = $zout->[0]->deflate($_[1], $deflated);
+ $err == Z_OK or die "->deflate failed $err";
+ $err = $zout->[0]->flush($deflated, Z_PARTIAL_FLUSH);
+ $err == Z_OK or die "->flush failed $err";
+
+ # PublicInbox::DS::write puts partial writes into another buffer,
+ # so we can prepare the next deflate buffer:
+ $zout->[1] = '';
+ $self->SUPER::write(\$deflated);
+}
+
+1;
--- /dev/null
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Integration test to validate compression.
+use strict;
+use warnings;
+use File::Temp qw(tempdir);
+use Test::More;
+use Symbol qw(gensym);
+use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+my $inbox_dir = $ENV{GIANT_INBOX_DIR};
+plan skip_all => "GIANT_INBOX_DIR not defined for $0" unless $inbox_dir;
+my $mid = $ENV{TEST_MID};
+
+# This test is also an excuse for me to experiment with Perl threads :P
+unless (eval 'use threads; 1') {
+ plan skip_all => "$0 requires a threaded perl" if $@;
+}
+
+# Net::NNTP is part of the standard library, but distros may split it off...
+foreach my $mod (qw(DBD::SQLite Net::NNTP Compress::Raw::Zlib)) {
+ eval "require $mod";
+ plan skip_all => "$mod missing for $0" if $@;
+}
+
+my $test_compress = Net::NNTP->can('compress');
+if (!$test_compress) {
+ diag 'Your Net::NNTP does not yet support compression';
+ diag 'See: https://rt.cpan.org/Ticket/Display.html?id=129967';
+}
+my $test_tls = $ENV{TEST_SKIP_TLS} ? 0 : eval { require IO::Socket::SSL };
+my $cert = 'certs/server-cert.pem';
+my $key = 'certs/server-key.pem';
+if ($test_tls && !-r $key || !-r $cert) {
+ plan skip_all => "certs/ missing for $0, run ./certs/create-certs.perl";
+}
+require './t/common.perl';
+my $keep_tmp = !!$ENV{TEST_KEEP_TMP};
+my $tmpdir = tempdir('nntpd-validate-XXXXXX',TMPDIR => 1,CLEANUP => $keep_tmp);
+my (%OPT, $pid, $tail_pid, $host_port, $group);
+my $batch = 1000;
+END {
+ foreach ($pid, $tail_pid) {
+ kill 'TERM', $_ if defined $_;
+ }
+};
+if (($ENV{NNTP_TEST_URL} // '') =~ m!\Anntp://([^/]+)/([^/]+)\z!) {
+ ($host_port, $group) = ($1, $2);
+ $host_port .= ":119" unless index($host_port, ':') > 0;
+} else {
+ make_local_server();
+}
+my $test_article = $ENV{TEST_ARTICLE} // 0;
+my $test_xover = $ENV{TEST_XOVER} // 1;
+
+if ($test_tls) {
+ my $nntp = Net::NNTP->new($host_port, %OPT);
+ ok($nntp->starttls, 'STARTTLS works');
+ ok($nntp->compress, 'COMPRESS works') if $test_compress;
+ ok($nntp->quit, 'QUIT after starttls OK');
+}
+if ($test_compress) {
+ my $nntp = Net::NNTP->new($host_port, %OPT);
+ ok($nntp->compress, 'COMPRESS works');
+ ok($nntp->quit, 'QUIT after compress OK');
+}
+
+sub do_get_all {
+ my ($methods) = @_;
+ my $desc = join(',', @$methods);
+ my $t0 = clock_gettime(CLOCK_MONOTONIC);
+ my $dig = Digest::SHA->new(1);
+ my $digfh = gensym;
+ my $tmpfh;
+ if ($keep_tmp) {
+ open $tmpfh, '>', "$tmpdir/$desc.raw" or die $!;
+ }
+ my $tmp = { dig => $dig, tmpfh => $tmpfh };
+ tie *$digfh, 'DigestPipe', $tmp;
+ my $nntp = Net::NNTP->new($host_port, %OPT);
+ $nntp->article("<$mid>", $digfh) if $mid;
+ foreach my $m (@$methods) {
+ my $res = $nntp->$m;
+ print STDERR "# $m got $res ($desc)\n" if !$res;
+ }
+ $nntp->article("<$mid>", $digfh) if $mid;
+ my ($num, $first, $last) = $nntp->group($group);
+ unless (defined $num && defined $first && defined $last) {
+ warn "Invalid group\n";
+ return undef;
+ }
+ my $i;
+ for ($i = $first; $i < $last; $i += $batch) {
+ my $j = $i + $batch - 1;
+ $j = $last if $j > $last;
+ if ($test_xover) {
+ my $xover = $nntp->xover("$i-$j");
+ for my $n (sort { $a <=> $b } keys %$xover) {
+ my $line = join("\t", @{$xover->{$n}});
+ $line =~ tr/\r//d;
+ $dig->add("$n\t".$line);
+ }
+ }
+ if ($test_article) {
+ for my $n ($i..$j) {
+ $nntp->article($n, $digfh) and next;
+ next if $nntp->code == 423;
+ my $res = $nntp->code.' '. $nntp->message;
+
+ $res =~ tr/\r\n//d;
+ print STDERR "# Article $n ($desc): $res\n";
+ }
+ }
+ }
+ my $q = $nntp->quit;
+ print STDERR "# quit failed: ".$nntp->code."\n" if !$q;
+ my $elapsed = sprintf('%0.3f', clock_gettime(CLOCK_MONOTONIC) - $t0);
+ my $res = $dig->hexdigest;
+ print STDERR "# $desc - $res (${elapsed}s)\n";
+ $res;
+}
+my @tests = ([]);
+push @tests, [ 'compress' ] if $test_compress;
+push @tests, [ 'starttls' ] if $test_tls;
+push @tests, [ 'starttls', 'compress' ] if $test_tls && $test_compress;
+my (@keys, %thr, %res);
+for my $m (@tests) {
+ my $key = join(',', @$m);
+ push @keys, $key;
+ diag "$key start";
+ $thr{$key} = threads->create(\&do_get_all, $m);
+}
+
+$res{$_} = $thr{$_}->join for @keys;
+my $plain = $res{''};
+ok($plain, "plain got $plain");
+is($res{$_}, $plain, "$_ matches '' result") for @keys;
+
+done_testing();
+
+sub make_local_server {
+ require PublicInbox::Inbox;
+ $group = 'inbox.test.perf.nntpd';
+ my $ibx = { mainrepo => $inbox_dir, newsgroup => $group };
+ $ibx = PublicInbox::Inbox->new($ibx);
+ my $nntpd = 'blib/script/public-inbox-nntpd';
+ my $pi_config = "$tmpdir/config";
+ {
+ open my $fh, '>', $pi_config or die "open($pi_config): $!";
+ print $fh <<"" or die "print $pi_config: $!";
+[publicinbox "test"]
+ newsgroup = $group
+ mainrepo = $inbox_dir
+ address = test\@example.com
+
+ close $fh or die "close($pi_config): $!";
+ }
+ my ($out, $err) = ("$tmpdir/out", "$tmpdir/err");
+ for ($out, $err) {
+ open my $fh, '>', $_ or die "truncate: $!";
+ }
+ if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail
+ $tail_pid = fork;
+ if (defined $tail_pid && $tail_pid == 0) {
+ open STDOUT, '>&STDERR' or die ">&2 failed: $!";
+ exec(split(' ', $tail_cmd), $out, $err);
+ }
+ }
+ my $sock = tcp_server();
+ ok($sock, 'sock created');
+ $host_port = $sock->sockhost . ':' . $sock->sockport;
+
+ # not using multiple workers, here, since we want to increase
+ # the chance of tripping concurrency bugs within PublicInbox/NNTP*.pm
+ my $cmd = [ $nntpd, "--stdout=$out", "--stderr=$err", '-W0' ];
+ push @$cmd, "-lnntp://$host_port";
+ if ($test_tls) {
+ push @$cmd, "--cert=$cert", "--key=$key";
+ %OPT = (
+ SSL_hostname => 'server.local',
+ SSL_verifycn_name => 'server.local',
+ SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_PEER(),
+ SSL_ca_file => 'certs/test-ca.pem',
+ );
+ }
+ print STDERR "# CMD ". join(' ', @$cmd). "\n";
+ $pid = spawn_listener({ PI_CONFIG => $pi_config }, $cmd, [$sock]);
+}
+
+package DigestPipe;
+use strict;
+use warnings;
+
+sub TIEHANDLE {
+ my ($class, $self) = @_;
+ bless $self, $class;
+}
+
+sub PRINT {
+ my $self = shift;
+ my $data = join('', @_);
+ # Net::NNTP emit different line-endings depending on TLS or not...:
+ $data =~ tr/\r//d;
+ $self->{dig}->add($data);
+ if (my $tmpfh = $self->{tmpfh}) {
+ print $tmpfh $data;
+ }
+ 1;
+}
+1;