use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
use PublicInbox::Search;
use PublicInbox::Msgmap;
use PublicInbox::MID qw(mid_escape);
$nextt = undef;
my $q = $nextq;
$nextq = [];
- foreach my $nntp (@$q) {
- # for request && response protocols, always finish writing
- # before finishing reading:
- if (my $long_cb = $nntp->{long_res}) {
- $nntp->write($long_cb);
- } else {
- # pipelined request, we bypassed socket-readiness
- # checks to get here:
- event_step($nntp);
- }
- }
+ event_step($_) for @$q;
}
sub requeue ($) {
}
sub long_response ($$) {
- my ($self, $cb) = @_;
- die "BUG: nested long response" if $self->{long_res};
+ my ($self, $cb) = @_; # cb returns true if more, false if done
my $fd = fileno($self->{sock});
defined $fd or return;
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
my $t0 = now();
- $self->{long_res} = sub {
+ my $long_cb; # DANGER: self-referential
+ $long_cb = sub {
+ # wbuf is unset or empty, here; $cb may add to it
my $more = eval { $cb->() };
if ($@ || !$self->{sock}) { # something bad happened...
- delete $self->{long_res};
-
+ $long_cb = undef;
+ my $diff = now() - $t0;
if ($@) {
err($self,
"%s during long response[$fd] - %0.6f",
- $@, now() - $t0);
- }
- if ($self->{sock}) {
- update_idle_time($self);
- requeue($self);
- } else {
- out($self, " deferred[$fd] aborted - %0.6f",
- now() - $t0);
+ $@, $diff);
}
+ out($self, " deferred[$fd] aborted - %0.6f", $diff);
+ $self->close;
} elsif ($more) { # $self->{wbuf}:
+ update_idle_time($self);
+
# no recursion, schedule another call ASAP
# but only after all pending writes are done
- update_idle_time($self);
- requeue($self);
+ my $wbuf = $self->{wbuf} ||= [];
+ push @$wbuf, $long_cb;
+
+ # wbuf may be populated by $cb, no need to rearm if so:
+ requeue($self) if scalar(@$wbuf) == 1;
} else { # all done!
- delete $self->{long_res};
+ $long_cb = undef;
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
- requeue($self);
+ requeue($self) unless $self->{wbuf};
}
};
- $self->{long_res}->(); # kick off!
+ $self->write($long_cb); # kick off!
undef;
}
my $t0 = now();
my $fd = fileno($self->{sock});
$r = eval { process_line($self, $line) };
- my $d = $self->{long_res} ?
- " deferred[$fd]" : '';
- out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+ my $pending = $self->{wbuf} ? ' pending' : '';
+ out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
}
return $self->close if $r < 0;
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
- requeue($self) unless ($self->{long_res} || $self->{wbuf});
+ requeue($self) unless $self->{wbuf};
}
sub not_idle_long ($$) {
# for graceful shutdown in PublicInbox::Daemon:
sub busy {
my ($self, $now) = @_;
- ($self->{rbuf} ne '' || $self->{long_res} ||
- $self->{wbuf} || not_idle_long($self, $now));
+ ($self->{rbuf} ne '' || $self->{wbuf} || not_idle_long($self, $now));
}
1;