lib/PublicInbox/DS.pm | 4 +++-
lib/PublicInbox/NNTP.pm | 7 +++++++
lib/PublicInbox/NNTPdeflate.pm | 59 +++++++++++++++++++++++++++++++++--------------------
t/nntpd-validate.t | 8 +++++++-
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 586c47cdc8cef94e05509dfbf0a82136a147f870..b16b189629ff16d459bc7c79bb0155e43d9642ea 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -553,7 +553,9 @@ epwait($sock, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
- $self->write(\($_[1]));
+
+ # don't redispatch into NNTPdeflate::write
+ PublicInbox::DS::write($self, \($_[1]));
}
sub epwait ($$) {
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index d6f315bab42a9ed7e0b8b607a3ba6cf01066ef55..895858b754cd44e0a95e99889075a444b54393ee 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -642,6 +642,11 @@ $self->close;
} elsif ($more) { # $self->{wbuf}:
update_idle_time($self);
+ # COMPRESS users all share the same DEFLATE context.
+ # Flush it here to ensure clients don't see
+ # each other's data
+ $self->zflush;
+
# no recursion, schedule another call ASAP
# but only after all pending writes are done
my $wbuf = $self->{wbuf} ||= [];
@@ -924,6 +929,8 @@ PublicInbox::NNTPdeflate->enable($self);
$self->requeue;
undef
}
+
+sub zflush {} # overridden by NNTPdeflate
sub cmd_xpath ($$) {
my ($self, $mid) = @_;
diff --git a/lib/PublicInbox/NNTPdeflate.pm b/lib/PublicInbox/NNTPdeflate.pm
index 66210bfa84c040c1a2caa90f8d4c01e59d2e5c67..78da2a588674fe59b48ab1793ab1b2cfbc9dc576 100644
--- a/lib/PublicInbox/NNTPdeflate.pm
+++ b/lib/PublicInbox/NNTPdeflate.pm
@@ -2,13 +2,18 @@ # Copyright (C) 2019 all contributors
# License: AGPL-3.0+
# RFC 8054 NNTP COMPRESS DEFLATE implementation
-# Warning, enabling compression for C10K NNTP clients is rather
-# expensive in terms of memory use.
#
# RSS usage for 10K idle-but-did-something NNTP clients on 64-bit:
-# TLS + DEFLATE : 1.8 GB (MemLevel=9, 1.2 GB with MemLevel=8)
-# TLS only : <200MB
-# plain : <50MB
+# TLS + DEFLATE[a] : 1.8 GB (MemLevel=9, 1.2 GB with MemLevel=8)
+# TLS + DEFLATE[b] : ~300MB
+# TLS only : <200MB
+# plain : <50MB
+#
+# [a] - initial implementation using per-client Deflate contexts and buffer
+#
+# [b] - memory-optimized implementation using a global deflate context.
+# It's less efficient in terms of compression, but way more
+# efficient in terms of server memory usage.
package PublicInbox::NNTPdeflate;
use strict;
use warnings;
@@ -23,11 +28,11 @@ -WindowBits => -15, # RFC 1951
-AppendOutput => 1,
);
-my %OUT_OPT = (
+# global deflate context and buffer
+my $zbuf = \(my $buf = '');
+my $zout = Compress::Raw::Zlib::Deflate->new(
# nnrpd (INN) and Compress::Raw::Zlib favor MemLevel=9,
- # but the zlib C library and git use MemLevel=8
- # as the default. Using 8 drops our memory use with 10K
- # TLS clients from 1.8 GB to 1.2 GB, but...
+ # but the zlib C library and git use MemLevel=8 as the default.
# FIXME: sometimes clients fail with 8, so we use 9
# -MemLevel => 9,
@@ -43,7 +48,6 @@ my ($class, $self) = @_;
unlock_hash(%$self);
bless $self, $class;
$self->{zin} = [ Compress::Raw::Zlib::Inflate->new(%IN_OPT), '' ];
- $self->{zout} = [ Compress::Raw::Zlib::Deflate->new(%OUT_OPT), '' ];
}
# overrides PublicInbox::NNTP::compressed
@@ -74,31 +78,42 @@
# override PublicInbox::DS::msg_more
sub msg_more ($$) {
my $self = $_[0];
- my $zout = $self->{zout};
# $_[1] may be a reference or not for ->deflate
- my $err = $zout->[0]->deflate($_[1], $zout->[1]);
+ my $err = $zout->deflate($_[1], $zbuf);
$err == Z_OK or die "->deflate failed $err";
1;
}
-# SUPER is PublicInbox::DS::write, so $_[1] may be a reference or not
+sub zflush ($) {
+ my ($self) = @_;
+
+ my $deflated = $zbuf;
+ $zbuf = \(my $next = '');
+
+ my $err = $zout->flush($deflated, Z_FULL_FLUSH);
+ $err == Z_OK or die "->flush failed $err";
+
+ # We can still let the lower socket layer do buffering:
+ PublicInbox::DS::msg_more($self, $$deflated);
+}
+
+# compatible with PublicInbox::DS::write, so $_[1] may be a reference or not
sub write ($$) {
my $self = $_[0];
- return $self->SUPER::write($_[1]) if ref($_[1]) eq 'CODE';
- my $zout = $self->{zout};
- my $deflated = pop @$zout;
+ return PublicInbox::DS::write($self, $_[1]) if ref($_[1]) eq 'CODE';
+
+ my $deflated = $zbuf;
+ $zbuf = \(my $next = '');
# $_[1] may be a reference or not for ->deflate
- my $err = $zout->[0]->deflate($_[1], $deflated);
+ my $err = $zout->deflate($_[1], $deflated);
$err == Z_OK or die "->deflate failed $err";
- $err = $zout->[0]->flush($deflated, Z_PARTIAL_FLUSH);
+ $err = $zout->flush($deflated, Z_FULL_FLUSH);
$err == Z_OK or die "->flush failed $err";
- # PublicInbox::DS::write puts partial writes into another buffer,
- # so we can prepare the next deflate buffer:
- $zout->[1] = '';
- $self->SUPER::write(\$deflated);
+ # We can still let the socket layer do buffering:
+ PublicInbox::DS::write($self, $deflated);
}
1;
diff --git a/t/nntpd-validate.t b/t/nntpd-validate.t
index 1a325105cb5a7f789cf5a6d5068f860c0a9b72fc..532ef7293320a1e86412944f31d92e2cb98d45f0 100644
--- a/t/nntpd-validate.t
+++ b/t/nntpd-validate.t
@@ -112,11 +112,17 @@ print STDERR "# Article $n ($desc): $res\n";
}
}
}
+
+ # hacky bytes_read thing added to Net::NNTP for testing:
+ my $bytes_read = '';
+ if ($nntp->can('bytes_read')) {
+ $bytes_read .= ' '.$nntp->bytes_read.'b';
+ }
my $q = $nntp->quit;
print STDERR "# quit failed: ".$nntp->code."\n" if !$q;
my $elapsed = sprintf('%0.3f', clock_gettime(CLOCK_MONOTONIC) - $t0);
my $res = $dig->hexdigest;
- print STDERR "# $desc - $res (${elapsed}s)\n";
+ print STDERR "# $desc - $res (${elapsed}s)$bytes_read\n";
$res;
}
my @tests = ([]);