We can get rid of the {long_res} field and reuse the write
buffer ordering logic to prevent nesting of responses from
requeue.
On FreeBSD, this fixes a problem of callbacks firing twice
because kqueue as event_step is now our only callback entry
point.
There's a slight change in the stdout "logging" format, in
that we can no longer distinguish between writes blocked
due to slow clients or deferred long responses. Not sure
if this affects anybody parsing logs or not, but preserving
the old format could prove expensive and not worth the
effort.
use strict;
use warnings;
use base qw(PublicInbox::DS);
use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
use PublicInbox::Search;
use PublicInbox::Msgmap;
use PublicInbox::MID qw(mid_escape);
use PublicInbox::Search;
use PublicInbox::Msgmap;
use PublicInbox::MID qw(mid_escape);
$nextt = undef;
my $q = $nextq;
$nextq = [];
$nextt = undef;
my $q = $nextq;
$nextq = [];
- foreach my $nntp (@$q) {
- # for request && response protocols, always finish writing
- # before finishing reading:
- if (my $long_cb = $nntp->{long_res}) {
- $nntp->write($long_cb);
- } else {
- # pipelined request, we bypassed socket-readiness
- # checks to get here:
- event_step($nntp);
- }
- }
+ event_step($_) for @$q;
}
sub long_response ($$) {
}
sub long_response ($$) {
- my ($self, $cb) = @_;
- die "BUG: nested long response" if $self->{long_res};
+ my ($self, $cb) = @_; # cb returns true if more, false if done
my $fd = fileno($self->{sock});
defined $fd or return;
my $fd = fileno($self->{sock});
defined $fd or return;
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
my $t0 = now();
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
my $t0 = now();
- $self->{long_res} = sub {
+ my $long_cb; # DANGER: self-referential
+ $long_cb = sub {
+ # wbuf is unset or empty, here; $cb may add to it
my $more = eval { $cb->() };
if ($@ || !$self->{sock}) { # something bad happened...
my $more = eval { $cb->() };
if ($@ || !$self->{sock}) { # something bad happened...
- delete $self->{long_res};
-
+ $long_cb = undef;
+ my $diff = now() - $t0;
if ($@) {
err($self,
"%s during long response[$fd] - %0.6f",
if ($@) {
err($self,
"%s during long response[$fd] - %0.6f",
- $@, now() - $t0);
- }
- if ($self->{sock}) {
- update_idle_time($self);
- requeue($self);
- } else {
- out($self, " deferred[$fd] aborted - %0.6f",
- now() - $t0);
+ out($self, " deferred[$fd] aborted - %0.6f", $diff);
+ $self->close;
} elsif ($more) { # $self->{wbuf}:
} elsif ($more) { # $self->{wbuf}:
+ update_idle_time($self);
+
# no recursion, schedule another call ASAP
# but only after all pending writes are done
# no recursion, schedule another call ASAP
# but only after all pending writes are done
- update_idle_time($self);
- requeue($self);
+ my $wbuf = $self->{wbuf} ||= [];
+ push @$wbuf, $long_cb;
+
+ # wbuf may be populated by $cb, no need to rearm if so:
+ requeue($self) if scalar(@$wbuf) == 1;
- delete $self->{long_res};
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
+ requeue($self) unless $self->{wbuf};
- $self->{long_res}->(); # kick off!
+ $self->write($long_cb); # kick off!
my $t0 = now();
my $fd = fileno($self->{sock});
$r = eval { process_line($self, $line) };
my $t0 = now();
my $fd = fileno($self->{sock});
$r = eval { process_line($self, $line) };
- my $d = $self->{long_res} ?
- " deferred[$fd]" : '';
- out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+ my $pending = $self->{wbuf} ? ' pending' : '';
+ out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
}
return $self->close if $r < 0;
}
return $self->close if $r < 0;
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
- requeue($self) unless ($self->{long_res} || $self->{wbuf});
+ requeue($self) unless $self->{wbuf};
}
sub not_idle_long ($$) {
}
sub not_idle_long ($$) {
# for graceful shutdown in PublicInbox::Daemon:
sub busy {
my ($self, $now) = @_;
# for graceful shutdown in PublicInbox::Daemon:
sub busy {
my ($self, $now) = @_;
- ($self->{rbuf} ne '' || $self->{long_res} ||
- $self->{wbuf} || not_idle_long($self, $now));
+ ($self->{rbuf} ne '' || $self->{wbuf} || not_idle_long($self, $now));