]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/NNTP.pm
Merge remote-tracking branch 'origin/purge'
[public-inbox.git] / lib / PublicInbox / NNTP.pm
index 022bb809585ca38edb1164d83807bd6063fc4db2..63d5870b045756463b8d78f7b16c4689481a15be 100644 (file)
@@ -51,8 +51,16 @@ sub next_tick () {
                # before finishing reading:
                if (my $long_cb = $nntp->{long_res}) {
                        $nntp->write($long_cb);
-               } elsif (&Danga::Socket::POLLIN & $nntp->{event_watch}) {
+               } else {
+                       # pipelined request, we bypassed socket-readiness
+                       # checks to get here:
                        event_read($nntp);
+
+                       # maybe there's more pipelined data, or we'll have
+                       # to register it for socket-readiness notifications
+                       if (!$nntp->{long_res} && !$nntp->{closed}) {
+                               check_read($nntp);
+                       }
                }
        }
 }
@@ -507,6 +515,7 @@ sub set_art {
 sub _header ($) {
        my $hdr = $_[0]->header_obj->as_string;
        utf8::encode($hdr);
+       $hdr =~ s/(?<!\r)\n/\r\n/sg;
        $hdr
 }
 
@@ -609,7 +618,7 @@ sub long_response ($$) {
                                           now() - $t0);
                        } else {
                                update_idle_time($self);
-                               $self->watch_read(1);
+                               check_read($self);
                        }
                } elsif ($more) { # $self->{write_buf_size}:
                        # no recursion, schedule another call ASAP
@@ -620,7 +629,7 @@ sub long_response ($$) {
                        $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
                } else { # all done!
                        $self->{long_res} = undef;
-                       $self->watch_read(1);
+                       check_read($self);
                        res($self, '.');
                        out($self, " deferred[$fd] done - %0.6f", now() - $t0);
                }
@@ -894,7 +903,7 @@ sub more ($$) {
 sub do_write ($$) {
        my ($self, $data) = @_;
        my $done = $self->write($data);
-       return if $self->{closed};
+       return if $self->{closed};
 
        # Do not watch for readability if we have data in the queue,
        # instead re-enable watching for readability when we can
@@ -968,10 +977,9 @@ sub event_read {
        update_idle_time($self);
 }
 
-sub watch_read {
-       my ($self, $bool) = @_;
-       my $rv = $self->SUPER::watch_read($bool);
-       if ($bool && index($self->{rbuf}, "\n") >= 0) {
+sub check_read {
+       my ($self) = @_;
+       if (index($self->{rbuf}, "\n") >= 0) {
                # Force another read if there is a pipelined request.
                # We don't know if the socket has anything for us to read,
                # and we must double-check again by the time the timer fires
@@ -979,8 +987,11 @@ sub watch_read {
                # another long response.
                push @$nextq, $self;
                $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+       } else {
+               # no pipelined requests available, let the kernel know
+               # to wake us up if there's more
+               $self->watch_read(1); # Danga::Socket::watch_read
        }
-       $rv;
 }
 
 sub not_idle_long ($$) {