X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FNNTP.pm;h=8f8668591e51ce0e0ffce62f9caa28fb3a1bd223;hb=ebb1ae89f3b4a52daaf581e727c0dcbe6b7c103e;hp=0e9108249b07207643252df3cf84665168e4af3e;hpb=f033d9bff75455c6c06ce39de8c60816868f1457;p=public-inbox.git diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 0e910824..8f866859 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -4,7 +4,7 @@ package PublicInbox::NNTP; use strict; use warnings; use base qw(Danga::Socket); -use fields qw(nntpd article ng long_res); +use fields qw(nntpd article rbuf ng long_res); use PublicInbox::Msgmap; use PublicInbox::GitCatFile; use PublicInbox::MID qw(mid2path); @@ -32,6 +32,7 @@ sub new ($$$) { $self->SUPER::new($sock); $self->{nntpd} = $nntpd; res($self, '201 server ready - post via email'); + $self->{rbuf} = ''; $self->watch_read(1); $self; } @@ -762,20 +763,44 @@ sub event_write { sub event_read { my ($self) = @_; use constant LINE_MAX => 512; # RFC 977 section 2.3 + my $line; my $r = 1; - my $buf = $self->read(LINE_MAX) or return $self->close; - while ($r > 0 && $$buf =~ s/\A\s*([^\r\n]+)\r?\n//) { - my $line = $1; +again: + while ($r > 0 && $self->{rbuf} =~ s/\A\s*([^\r\n]+)\r?\n//) { + $line = $1; my $t0 = now(); $r = eval { $self->process_line($line) }; my $d = $self->{long_res} ? ' deferred['.fileno($self->{sock}).']' : ''; out($self, "$line - %0.6f$d", now() - $t0); } + unless (defined $line) { + my $buf = $self->read(LINE_MAX) or return $self->close; + $self->{rbuf} .= $$buf; + goto again; + } + return $self->close if $r < 0; - my $len = bytes::length($$buf); + my $len = bytes::length($self->{rbuf}); return $self->close if ($len >= LINE_MAX); - $self->push_back_read($buf) if ($len); +} + +sub watch_read { + my ($self, $bool) = @_; + my $rv = $self->SUPER::watch_read($bool); + if ($bool && $self->{rbuf} ne '') { + # Force another read if there is a pipelined request. + # We don't know if the socket has anything for us to read, + # and we must double-check again by the time the timer fires + # in case we really did dispatch a read event and started + # another long response. + Danga::Socket->AddTimer(0, sub { + if (&Danga::Socket::POLLIN & $self->{event_watch}) { + $self->event_read; + } + }); + } + $rv; } 1;