]> Sergey Matveev's repositories - public-inbox.git/blobdiff - lib/PublicInbox/NNTP.pm
nntp: lazily allocate and stash rbuf
[public-inbox.git] / lib / PublicInbox / NNTP.pm
index 53de2bca2495c7b49b4369dbc1a94ebae7fa41fc..10a2e158fe739ff171d8ccff4cff2654e5f0f499 100644 (file)
@@ -6,7 +6,7 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
 use PublicInbox::MID qw(mid_escape);
@@ -45,17 +45,7 @@ sub next_tick () {
        $nextt = undef;
        my $q = $nextq;
        $nextq = [];
-       foreach my $nntp (@$q) {
-               # for request && response protocols, always finish writing
-               # before finishing reading:
-               if (my $long_cb = $nntp->{long_res}) {
-                       $nntp->write($long_cb);
-               } else {
-                       # pipelined request, we bypassed socket-readiness
-                       # checks to get here:
-                       event_step($nntp);
-               }
-       }
+       event_step($_) for @$q;
 }
 
 sub requeue ($) {
@@ -101,6 +91,8 @@ sub expire_old () {
        }
 }
 
+sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
+
 sub new ($$$) {
        my ($class, $sock, $nntpd) = @_;
        my $self = fields::new($class);
@@ -113,11 +105,8 @@ sub new ($$$) {
        }
        $self->SUPER::new($sock, $ev);
        $self->{nntpd} = $nntpd;
-       my $greet = "201 $nntpd->{servername} ready - post via email\r\n";
-       open my $fh, '<:scalar',  \$greet or die "open :scalar: $!";
-       push @$wbuf, $fh;
+       push @$wbuf, \&greet;
        $self->{wbuf} = $wbuf;
-       $self->{rbuf} = '';
        update_idle_time($self);
        $expt ||= PublicInbox::EvCleanup::later(*expire_old);
        $self;
@@ -633,8 +622,7 @@ sub get_range ($$) {
 }
 
 sub long_response ($$) {
-       my ($self, $cb) = @_;
-       die "BUG: nested long response" if $self->{long_res};
+       my ($self, $cb) = @_; # cb returns true if more, false if done
 
        my $fd = fileno($self->{sock});
        defined $fd or return;
@@ -642,36 +630,38 @@ sub long_response ($$) {
        # clients should not be sending us stuff and making us do more
        # work while we are stream a response to them
        my $t0 = now();
-       $self->{long_res} = sub {
+       my $long_cb; # DANGER: self-referential
+       $long_cb = sub {
+               # wbuf is unset or empty, here; $cb may add to it
                my $more = eval { $cb->() };
                if ($@ || !$self->{sock}) { # something bad happened...
-                       delete $self->{long_res};
-
+                       $long_cb = undef;
+                       my $diff = now() - $t0;
                        if ($@) {
                                err($self,
                                    "%s during long response[$fd] - %0.6f",
-                                   $@, now() - $t0);
-                       }
-                       if ($self->{sock}) {
-                               update_idle_time($self);
-                               requeue($self);
-                       } else {
-                               out($self, " deferred[$fd] aborted - %0.6f",
-                                          now() - $t0);
+                                   $@, $diff);
                        }
+                       out($self, " deferred[$fd] aborted - %0.6f", $diff);
+                       $self->close;
                } elsif ($more) { # $self->{wbuf}:
+                       update_idle_time($self);
+
                        # no recursion, schedule another call ASAP
                        # but only after all pending writes are done
-                       update_idle_time($self);
-                       requeue($self);
+                       my $wbuf = $self->{wbuf} ||= [];
+                       push @$wbuf, $long_cb;
+
+                       # wbuf may be populated by $cb, no need to rearm if so:
+                       requeue($self) if scalar(@$wbuf) == 1;
                } else { # all done!
-                       delete $self->{long_res};
+                       $long_cb = undef;
                        res($self, '.');
                        out($self, " deferred[$fd] done - %0.6f", now() - $t0);
-                       requeue($self);
+                       requeue($self) unless $self->{wbuf};
                }
        };
-       $self->{long_res}->(); # kick off!
+       $self->write($long_cb); # kick off!
        undef;
 }
 
@@ -973,7 +963,7 @@ sub event_step {
        # otherwise we can be buffering infinitely w/o backpressure
 
        use constant LINE_MAX => 512; # RFC 977 section 2.3
-       my $rbuf = \($self->{rbuf});
+       my $rbuf = $self->{rbuf} // (\(my $x = ''));
        my $r = 1;
 
        if (index($$rbuf, "\n") < 0) {
@@ -986,19 +976,23 @@ sub event_step {
                my $t0 = now();
                my $fd = fileno($self->{sock});
                $r = eval { process_line($self, $line) };
-               my $d = $self->{long_res} ?
-                       " deferred[$fd]" : '';
-               out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+               my $pending = $self->{wbuf} ? ' pending' : '';
+               out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
        }
 
        return $self->close if $r < 0;
        my $len = bytes::length($$rbuf);
        return $self->close if ($len >= LINE_MAX);
+       if ($len) {
+               $self->{rbuf} = $rbuf;
+       } else {
+               delete $self->{rbuf};
+       }
        update_idle_time($self);
 
        # maybe there's more pipelined data, or we'll have
        # to register it for socket-readiness notifications
-       requeue($self) unless ($self->{long_res} || $self->{wbuf});
+       requeue($self) unless $self->{wbuf};
 }
 
 sub not_idle_long ($$) {
@@ -1012,8 +1006,7 @@ sub not_idle_long ($$) {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy {
        my ($self, $now) = @_;
-       ($self->{rbuf} ne '' || $self->{long_res} ||
-               $self->{wbuf} || not_idle_long($self, $now));
+       ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
 }
 
 1;