use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
use PublicInbox::Search;
use PublicInbox::Msgmap;
use PublicInbox::MID qw(mid_escape);
r225 => '225 Headers follow (multi-line)',
r430 => '430 No article with that message-id',
};
-use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
use Errno qw(EAGAIN);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
$nextt = undef;
my $q = $nextq;
$nextq = [];
- foreach my $nntp (@$q) {
- # for request && response protocols, always finish writing
- # before finishing reading:
- if (my $long_cb = $nntp->{long_res}) {
- $nntp->write($long_cb);
- } else {
- # pipelined request, we bypassed socket-readiness
- # checks to get here:
- event_step($nntp);
- }
- }
+ event_step($_) for @$q;
}
sub requeue ($) {
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- my $ev = EPOLLOUT | EPOLLONESHOT;
- my $wbuf = [];
+ my $ev = EPOLLIN;
+ my $wbuf;
if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
$ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
- $ev |= EPOLLONESHOT;
- $wbuf->[0] = \&PublicInbox::DS::accept_tls_step;
+ $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
}
- $self->SUPER::new($sock, $ev);
+ $self->SUPER::new($sock, $ev | EPOLLONESHOT);
$self->{nntpd} = $nntpd;
- push @$wbuf, \&greet;
- $self->{wbuf} = $wbuf;
- $self->{rbuf} = '';
+ if ($wbuf) {
+ $self->{wbuf} = $wbuf;
+ } else {
+ greet($self);
+ }
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
$self;
found:
my $smsg = $ng->over->get_art($n) or return $err;
my $msg = $ng->msg_by_smsg($smsg) or return $err;
- my $s = Email::Simple->new($msg);
- if ($set_headers) {
- set_nntp_headers($self, $s->header_obj, $ng, $n, $mid);
- # must be last
- $s->body_set('') if ($set_headers == 2);
- }
- [ $n, $mid, $s, $smsg->bytes, $smsg->lines, $ng ];
+ # Email::Simple->new will modify $msg in-place as documented
+ # in its manpage, so what's left is the body and we won't need
+ # to call Email::Simple::body(), later
+ my $hdr = Email::Simple->new($msg)->header_obj;
+ set_nntp_headers($self, $hdr, $ng, $n, $mid) if $set_headers;
+ [ $n, $mid, $msg, $hdr ];
}
-sub simple_body_write ($$) {
- my ($self, $s) = @_;
- my $body = $s->body;
- $s->body_set('');
- $body =~ s/^\./../smg;
- $body =~ s/(?<!\r)\n/\r\n/sg;
- msg_more($self, $body);
- msg_more($self, "\r\n") unless $body =~ /\r\n\z/s;
+sub msg_body_write ($$) {
+ my ($self, $msg) = @_;
+
+ # these can momentarily double the memory consumption :<
+ $$msg =~ s/^\./../smg;
+ $$msg =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
+ $$msg .= "\r\n" unless $$msg =~ /\r\n\z/s;
+ msg_more($self, $$msg);
'.'
}
$self->{article} = $art if defined $art && $art =~ /\A[0-9]+\z/;
}
-sub _header ($) {
- my $hdr = $_[0]->header_obj->as_string;
+sub msg_hdr_write ($$$) {
+ my ($self, $hdr, $body_follows) = @_;
+ $hdr = $hdr->as_string;
utf8::encode($hdr);
- $hdr =~ s/(?<!\r)\n/\r\n/sg;
+ $hdr =~ s/(?<!\r)\n/\r\n/sg; # Alpine barfs without this
# for leafnode compatibility, we need to ensure Message-ID headers
# are only a single line. We can't subclass Email::Simple::Header
# and override _default_fold_at in here, either; since that won't
# affect messages already in the archive.
$hdr =~ s/^(Message-ID:)[ \t]*\r\n[ \t]+([^\r]+)\r\n/$1 $2\r\n/igsm;
-
- $hdr
+ $hdr .= "\r\n" if $body_follows;
+ msg_more($self, $hdr);
}
sub cmd_article ($;$) {
my ($self, $art) = @_;
my $r = art_lookup($self, $art, 1);
return $r unless ref $r;
- my ($n, $mid, $s) = @$r;
+ my ($n, $mid, $msg, $hdr) = @$r;
set_art($self, $art);
more($self, "220 $n <$mid> article retrieved - head and body follow");
- msg_more($self, _header($s));
- msg_more($self, "\r\n");
- simple_body_write($self, $s);
+ msg_hdr_write($self, $hdr, 1);
+ msg_body_write($self, $msg);
}
sub cmd_head ($;$) {
my ($self, $art) = @_;
my $r = art_lookup($self, $art, 2);
return $r unless ref $r;
- my ($n, $mid, $s) = @$r;
+ my ($n, $mid, undef, $hdr) = @$r;
set_art($self, $art);
more($self, "221 $n <$mid> article retrieved - head follows");
- msg_more($self, _header($s));
+ msg_hdr_write($self, $hdr, 0);
'.'
}
my ($self, $art) = @_;
my $r = art_lookup($self, $art, 0);
return $r unless ref $r;
- my ($n, $mid, $s) = @$r;
+ my ($n, $mid, $msg) = @$r;
set_art($self, $art);
more($self, "222 $n <$mid> article retrieved - body follows");
- simple_body_write($self, $s);
+ msg_body_write($self, $msg);
}
sub cmd_stat ($;$) {
my ($self, $art) = @_;
my $r = art_lookup($self, $art, 0);
return $r unless ref $r;
- my ($n, $mid, undef) = @$r;
+ my ($n, $mid) = @$r;
set_art($self, $art);
"223 $n <$mid> article retrieved - request text separately";
}
}
sub long_response ($$) {
- my ($self, $cb) = @_;
- die "BUG: nested long response" if $self->{long_res};
+ my ($self, $cb) = @_; # cb returns true if more, false if done
my $fd = fileno($self->{sock});
defined $fd or return;
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
my $t0 = now();
- $self->{long_res} = sub {
+ my $long_cb; # DANGER: self-referential
+ $long_cb = sub {
+ # wbuf is unset or empty, here; $cb may add to it
my $more = eval { $cb->() };
if ($@ || !$self->{sock}) { # something bad happened...
- delete $self->{long_res};
-
+ $long_cb = undef;
+ my $diff = now() - $t0;
if ($@) {
err($self,
"%s during long response[$fd] - %0.6f",
- $@, now() - $t0);
- }
- if ($self->{sock}) {
- update_idle_time($self);
- requeue($self);
- } else {
- out($self, " deferred[$fd] aborted - %0.6f",
- now() - $t0);
+ $@, $diff);
}
+ out($self, " deferred[$fd] aborted - %0.6f", $diff);
+ $self->close;
} elsif ($more) { # $self->{wbuf}:
+ update_idle_time($self);
+
# no recursion, schedule another call ASAP
# but only after all pending writes are done
- update_idle_time($self);
- requeue($self);
+ my $wbuf = $self->{wbuf} ||= [];
+ push @$wbuf, $long_cb;
+
+ # wbuf may be populated by $cb, no need to rearm if so:
+ requeue($self) if scalar(@$wbuf) == 1;
} else { # all done!
- delete $self->{long_res};
+ $long_cb = undef;
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
- requeue($self);
+ requeue($self) unless $self->{wbuf};
}
};
- $self->{long_res}->(); # kick off!
+ $self->write($long_cb); # kick off!
undef;
}
}
sub hdr_mid_response ($$$$$$) {
- my ($self, $xhdr, $ng, $n, $mid, $v) = @_; # r: art_lookup result
+ my ($self, $xhdr, $ng, $n, $mid, $v) = @_;
my $res = '';
if ($xhdr) {
$res .= r221 . "\r\n";
# otherwise we can be buffering infinitely w/o backpressure
use constant LINE_MAX => 512; # RFC 977 section 2.3
- my $rbuf = \($self->{rbuf});
+ my $rbuf = $self->{rbuf} // (\(my $x = ''));
my $r = 1;
if (index($$rbuf, "\n") < 0) {
my $t0 = now();
my $fd = fileno($self->{sock});
$r = eval { process_line($self, $line) };
- my $d = $self->{long_res} ?
- " deferred[$fd]" : '';
- out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+ my $pending = $self->{wbuf} ? ' pending' : '';
+ out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
}
return $self->close if $r < 0;
my $len = bytes::length($$rbuf);
return $self->close if ($len >= LINE_MAX);
+ if ($len) {
+ $self->{rbuf} = $rbuf;
+ } else {
+ delete $self->{rbuf};
+ }
update_idle_time($self);
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
- requeue($self) unless ($self->{long_res} || $self->{wbuf});
+ requeue($self) unless $self->{wbuf};
}
sub not_idle_long ($$) {
# for graceful shutdown in PublicInbox::Daemon:
sub busy {
my ($self, $now) = @_;
- ($self->{rbuf} ne '' || $self->{long_res} ||
- $self->{wbuf} || not_idle_long($self, $now));
+ ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
}
1;