Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
- $self->{wbuf} = [];
-
my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
_InitPoller();
# we need to flush our write buffer, as there may
# be self-referential closures (sub { $client->close })
# preventing the object from being destroyed
- @{$self->{wbuf}} = ();
+ delete $self->{wbuf};
# if we're using epoll, we have to remove this from our epoll fd so we stop getting
# notifications about it
# returns 1 if done, 0 if incomplete
sub flush_write ($) {
my ($self) = @_;
+ my $wbuf = $self->{wbuf} or return 1;
my $sock = $self->{sock} or return 1;
- my $wbuf = $self->{wbuf};
while (my $bref = $wbuf->[0]) {
my $ref = ref($bref);
}
} # while @$wbuf
+ delete $self->{wbuf};
$self->watch_write(0);
1; # all done
}
my $sock = $self->{sock} or return 1;
my $ref = ref $data;
my $bref = $ref ? $data : \$data;
- my $wbuf = $self->{wbuf};
- if (@$wbuf) { # already buffering, can't write more...
+ if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more...
push @$wbuf, $bref;
return 0;
} elsif ($ref eq 'CODE') {
if (defined $written) {
return 1 if $written == $to_write;
$self->{wbuf_off} = $written;
- push @$wbuf, $bref;
+ $self->{wbuf} = [ $bref ];
return flush_write($self); # try until EAGAIN
} elsif ($! == EAGAIN) {
- push @$wbuf, $bref;
+ $self->{wbuf} = [ $bref ];
$self->watch_write(1);
} else {
$self->close;
# never drains wbuf. We can avoid wasting a hash slot by
# stuffing the read-end of the pipe into the never-to-be-touched
# wbuf
- push @{$self->{wbuf}}, $r;
+ $self->{wbuf} = $r;
$self;
}
sub event_step { # called by PublicInbox::DS
my ($self) = @_;
- my $wbuf = $self->{wbuf};
- if (@$wbuf) {
- $self->write(undef);
- return if !$self->{sock} || scalar(@$wbuf);
- }
+ return unless $self->flush_write && $self->{sock};
+
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
$write->($buf); # may close in PublicInbox::DS::write
if ($self->{sock}) {
my $next = $self->{pull};
- if (scalar @{$self->{wbuf}}) {
+ if ($self->{wbuf}) {
$self->write($next);
} else {
PublicInbox::EvCleanup::asap($next);
sub more ($$) {
my $self = $_[0];
return unless $self->{sock};
- if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
+ if (MSG_MORE && !$self->{wbuf}) {
my $n = send($self->{sock}, $_[1], MSG_MORE);
if (defined $n) {
my $nlen = length($_[1]) - $n;
# for graceful shutdown in PublicInbox::Daemon:
sub busy () {
my ($self) = @_;
- ($self->{rbuf} ne '' || $self->{env} || scalar(@{$self->{wbuf}}));
+ ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
}
1;
if ($r) {
$fh->write($$bref);
if ($http->{sock}) { # !closed
- if (scalar @{$http->{wbuf}}) {
+ if ($http->{wbuf}) {
$self->watch_read(0);
$http->write(restart_read_cb($self));
}
out($self, " deferred[$fd] aborted - %0.6f",
now() - $t0);
}
- } elsif ($more) { # scalar @{$self->{wbuf}}:
+ } elsif ($more) { # $self->{wbuf}:
# no recursion, schedule another call ASAP
# but only after all pending writes are done
update_idle_time($self);
sub do_more ($$) {
my ($self, $data) = @_;
- if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
+ if (MSG_MORE && !$self->{wbuf}) {
my $n = send($self->{sock}, $data, MSG_MORE);
if (defined $n) {
my $dlen = length($data);
sub event_step {
my ($self) = @_;
- return unless $self->{sock};
- my $wbuf = $self->{wbuf};
- if (@$wbuf) {
- update_idle_time($self);
- $self->write(undef);
- return if !$self->{sock} || scalar(@$wbuf);
- }
+ return unless $self->flush_write && $self->{sock};
return if $self->{long_res};
+
+ update_idle_time($self);
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
sub busy {
my ($self, $now) = @_;
($self->{rbuf} ne '' || $self->{long_res} ||
- scalar(@{$self->{wbuf}}) || not_idle_long($self, $now));
+ $self->{wbuf} || not_idle_long($self, $now));
}
1;