]> Sergey Matveev's repositories - public-inbox.git/commitdiff
ds: lazy-initialize wbuf
authorEric Wong <e@80x24.org>
Mon, 24 Jun 2019 02:52:09 +0000 (02:52 +0000)
committerEric Wong <e@80x24.org>
Mon, 24 Jun 2019 05:26:25 +0000 (05:26 +0000)
We don't need write buffering unless we encounter slow clients
requesting large responses.  So don't waste a hash slot or
(empty) arrayref for it.

lib/PublicInbox/DS.pm
lib/PublicInbox/EvCleanup.pm
lib/PublicInbox/HTTP.pm
lib/PublicInbox/HTTPD/Async.pm
lib/PublicInbox/NNTP.pm

index f1b7bab7ce0b3fc09eced454003b6be402002e6b..d07620a87215892a846ac0112ea6ad271bed5700 100644 (file)
@@ -402,8 +402,6 @@ sub new {
     Carp::cluck("undef sock and/or fd in PublicInbox::DS->new.  sock=" . ($sock || "") . ", fd=" . ($fd || ""))
         unless $sock && $fd;
 
-    $self->{wbuf} = [];
-
     my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
 
     _InitPoller();
@@ -454,7 +452,7 @@ sub close {
     # we need to flush our write buffer, as there may
     # be self-referential closures (sub { $client->close })
     # preventing the object from being destroyed
-    @{$self->{wbuf}} = ();
+    delete $self->{wbuf};
 
     # if we're using epoll, we have to remove this from our epoll fd so we stop getting
     # notifications about it
@@ -483,8 +481,8 @@ sub close {
 # returns 1 if done, 0 if incomplete
 sub flush_write ($) {
     my ($self) = @_;
+    my $wbuf = $self->{wbuf} or return 1;
     my $sock = $self->{sock} or return 1;
-    my $wbuf = $self->{wbuf};
 
     while (my $bref = $wbuf->[0]) {
         my $ref = ref($bref);
@@ -512,6 +510,7 @@ sub flush_write ($) {
         }
     } # while @$wbuf
 
+    delete $self->{wbuf};
     $self->watch_write(0);
     1; # all done
 }
@@ -538,8 +537,7 @@ sub write {
     my $sock = $self->{sock} or return 1;
     my $ref = ref $data;
     my $bref = $ref ? $data : \$data;
-    my $wbuf = $self->{wbuf};
-    if (@$wbuf) { # already buffering, can't write more...
+    if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more...
         push @$wbuf, $bref;
         return 0;
     } elsif ($ref eq 'CODE') {
@@ -552,10 +550,10 @@ sub write {
         if (defined $written) {
             return 1 if $written == $to_write;
             $self->{wbuf_off} = $written;
-            push @$wbuf, $bref;
+            $self->{wbuf} = [ $bref ];
             return flush_write($self); # try until EAGAIN
         } elsif ($! == EAGAIN) {
-            push @$wbuf, $bref;
+            $self->{wbuf} = [ $bref ];
             $self->watch_write(1);
         } else {
             $self->close;
index c64e2388512508cfea87624007adce585eadfc24..bd4dda117192f9dfa107070fff63890fea8cf1c9 100644 (file)
@@ -29,7 +29,7 @@ sub once_init () {
        # never drains wbuf.  We can avoid wasting a hash slot by
        # stuffing the read-end of the pipe into the never-to-be-touched
        # wbuf
-       push @{$self->{wbuf}}, $r;
+       $self->{wbuf} = $r;
        $self;
 }
 
index dff5928681a9d904a281bb42f0f89fc5faa381e2..9a43069f1eda9eb31a2eead6a903dc04f4ca6d7a 100644 (file)
@@ -67,11 +67,8 @@ sub new ($$$) {
 sub event_step { # called by PublicInbox::DS
        my ($self) = @_;
 
-       my $wbuf = $self->{wbuf};
-       if (@$wbuf) {
-               $self->write(undef);
-               return if !$self->{sock} || scalar(@$wbuf);
-       }
+       return unless $self->flush_write && $self->{sock};
+
        # only read more requests if we've drained the write buffer,
        # otherwise we can be buffering infinitely w/o backpressure
 
@@ -268,7 +265,7 @@ sub getline_cb ($$$) {
                        $write->($buf); # may close in PublicInbox::DS::write
                        if ($self->{sock}) {
                                my $next = $self->{pull};
-                               if (scalar @{$self->{wbuf}}) {
+                               if ($self->{wbuf}) {
                                        $self->write($next);
                                } else {
                                        PublicInbox::EvCleanup::asap($next);
@@ -323,7 +320,7 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
 sub more ($$) {
        my $self = $_[0];
        return unless $self->{sock};
-       if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
+       if (MSG_MORE && !$self->{wbuf}) {
                my $n = send($self->{sock}, $_[1], MSG_MORE);
                if (defined $n) {
                        my $nlen = length($_[1]) - $n;
@@ -490,7 +487,7 @@ sub close {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy () {
        my ($self) = @_;
-       ($self->{rbuf} ne '' || $self->{env} || scalar(@{$self->{wbuf}}));
+       ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
 }
 
 1;
index 261a01e0fa477f0cd1b200c137c3c6ab25286fe8..46ea188c0127c8a0791015b1d475861f6aff9a39 100644 (file)
@@ -46,7 +46,7 @@ sub main_cb ($$$) {
                if ($r) {
                        $fh->write($$bref);
                        if ($http->{sock}) { # !closed
-                               if (scalar @{$http->{wbuf}}) {
+                               if ($http->{wbuf}) {
                                        $self->watch_read(0);
                                        $http->write(restart_read_cb($self));
                                }
index 0a473e42729707058abc6e676eff5e56278024b3..d9097cc792b1231c9236181c6a95611d507e905e 100644 (file)
@@ -644,7 +644,7 @@ sub long_response ($$) {
                                out($self, " deferred[$fd] aborted - %0.6f",
                                           now() - $t0);
                        }
-               } elsif ($more) { # scalar @{$self->{wbuf}}:
+               } elsif ($more) { # $self->{wbuf}:
                        # no recursion, schedule another call ASAP
                        # but only after all pending writes are done
                        update_idle_time($self);
@@ -950,7 +950,7 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
 
 sub do_more ($$) {
        my ($self, $data) = @_;
-       if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
+       if (MSG_MORE && !$self->{wbuf}) {
                my $n = send($self->{sock}, $data, MSG_MORE);
                if (defined $n) {
                        my $dlen = length($data);
@@ -963,15 +963,11 @@ sub do_more ($$) {
 
 sub event_step {
        my ($self) = @_;
-       return unless $self->{sock};
 
-       my $wbuf = $self->{wbuf};
-       if (@$wbuf) {
-               update_idle_time($self);
-               $self->write(undef);
-               return if !$self->{sock} || scalar(@$wbuf);
-       }
+       return unless $self->flush_write && $self->{sock};
        return if $self->{long_res};
+
+       update_idle_time($self);
        # only read more requests if we've drained the write buffer,
        # otherwise we can be buffering infinitely w/o backpressure
 
@@ -1035,7 +1031,7 @@ sub not_idle_long ($$) {
 sub busy {
        my ($self, $now) = @_;
        ($self->{rbuf} ne '' || $self->{long_res} ||
-               scalar(@{$self->{wbuf}}) || not_idle_long($self, $now));
+               $self->{wbuf} || not_idle_long($self, $now));
 }
 
 1;