$self->write($_[1]);
}
+my $pipelineq = [];
+my $next_tick;
+sub process_pipelineq () {
+ $next_tick = undef;
+ my $q = $pipelineq;
+ $pipelineq = [];
+ rbuf_process($_) foreach @$q;
+}
+
# overrides existing Danga::Socket method
sub event_write {
my ($self) = @_;
if ($self->{rbuf} eq '') { # wait for next request
$self->watch_read(1);
} else { # avoid recursion for pipelined requests
- Danga::Socket->AddTimer(0, sub { rbuf_process($self) });
+ push @$pipelineq, $self;
+ $next_tick ||= Danga::Socket->AddTimer(0, *process_pipelineq);
}
}
my $WEAKEN = {}; # string(nntpd) -> nntpd
my $WEAKTIMER;
+my $next_tick;
+my $nextq = [];
+sub next_tick () {
+ $next_tick = undef;
+ my $q = $nextq;
+ $nextq = [];
+ foreach my $nntp (@$q) {
+ # for request && response protocols, always finish writing
+ # before finishing reading:
+ if (my $long_cb = $nntp->{long_res}) {
+ $nntp->write($long_cb);
+ } elsif (&Danga::Socket::POLLIN & $nntp->{event_watch}) {
+ event_read($nntp);
+ }
+ }
+}
+
sub update_idle_time ($) {
my ($self) = @_;
my $tmp = $self->{sock} or return;
# no recursion, schedule another call ASAP
# but only after all pending writes are done
update_idle_time($self);
- Danga::Socket->AddTimer(0, sub {
- $self->write($self->{long_res});
- });
+
+ push @$nextq, $self;
+ $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
} else { # all done!
$self->{long_res} = undef;
$self->watch_read(1);
# and we must double-check again by the time the timer fires
# in case we really did dispatch a read event and started
# another long response.
- Danga::Socket->AddTimer(0, sub {
- if (&Danga::Socket::POLLIN & $self->{event_watch}) {
- event_read($self);
- }
- });
+ push @$nextq, $self;
+ $next_tick ||= Danga::Socket->AddTimer(0, *next_tick);
}
$rv;
}