type rateLimitedReader struct {
l *rate.Limiter
r io.Reader
-
- // This is the time of the last Read's reservation.
- lastRead time.Time
-}
-
-func (me *rateLimitedReader) justRead(b []byte) (n int, err error) {
- n, err = me.r.Read(b)
- me.lastRead = time.Now()
- return
}
func (me *rateLimitedReader) Read(b []byte) (n int, err error) {
b = b[:min(len(b), me.l.Burst())]
}
t := time.Now()
- n, err = me.justRead(b)
+ n, err = me.r.Read(b)
r := me.l.ReserveN(t, n)
panicif.False(r.OK())
time.Sleep(r.DelayFrom(t))
+++ /dev/null
-package torrent
-
-import (
- "io"
- "log"
- "math/rand"
- "sync"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "golang.org/x/time/rate"
-)
-
-func writeN(ws []io.Writer, n int) error {
- b := make([]byte, n)
- for _, w := range ws[1:] {
- n1 := rand.Intn(n)
- wn, err := w.Write(b[:n1])
- if wn != n1 {
- if err == nil {
- panic(n1)
- }
- return err
- }
- n -= n1
- }
- wn, err := ws[0].Write(b[:n])
- if wn != n {
- if err == nil {
- panic(n)
- }
- }
- return err
-}
-
-func TestRateLimitReaders(t *testing.T) {
- const (
- numReaders = 2
- bytesPerSecond = 100
- burst = 5
- readSize = 6
- writeRounds = 10
- bytesPerRound = 12
- )
- control := rate.NewLimiter(bytesPerSecond, burst)
- shared := rate.NewLimiter(bytesPerSecond, burst)
- var (
- ws []io.Writer
- cs []io.Closer
- )
- wg := sync.WaitGroup{}
- type read struct {
- N int
- // When the read was allowed.
- At time.Time
- }
- reads := make(chan read)
- done := make(chan struct{})
- for i := 0; i < numReaders; i += 1 {
- r, w := io.Pipe()
- ws = append(ws, w)
- cs = append(cs, w)
- wg.Add(1)
- go func() {
- defer wg.Done()
- r := rateLimitedReader{
- l: shared,
- r: r,
- }
- b := make([]byte, readSize)
- for {
- n, err := r.Read(b)
- select {
- case reads <- read{n, r.lastRead}:
- case <-done:
- return
- }
- if err == io.EOF {
- return
- }
- if err != nil {
- panic(err)
- }
- }
- }()
- }
- closeAll := func() {
- for _, c := range cs {
- c.Close()
- }
- }
- defer func() {
- close(done)
- closeAll()
- wg.Wait()
- }()
- written := 0
- go func() {
- for i := 0; i < writeRounds; i += 1 {
- err := writeN(ws, bytesPerRound)
- if err != nil {
- log.Printf("error writing: %s", err)
- break
- }
- written += bytesPerRound
- }
- closeAll()
- wg.Wait()
- close(reads)
- }()
- totalBytesRead := 0
- started := time.Now()
- for r := range reads {
- totalBytesRead += r.N
- require.False(t, r.At.IsZero())
- // Copy what the reader should have done with its reservation.
- res := control.ReserveN(r.At, r.N)
- // If we don't have to wait with the control, the reader has gone too
- // fast.
- if res.Delay() > 0 {
- log.Printf("%d bytes not allowed at %s", r.N, time.Since(started))
- t.FailNow()
- }
- }
- assert.EqualValues(t, writeRounds*bytesPerRound, totalBytesRead)
-}