From e8b496bee66d109d2b8b59aede45c56dc9b0f15c Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 9 Sep 2021 20:55:09 +1000 Subject: [PATCH] Add default sqrt readahead algorithm --- reader.go | 46 ++++++++++++++++++++++++++++++++++------------ t.go | 10 +++++----- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/reader.go b/reader.go index 97ae0ed6..745409f4 100644 --- a/reader.go +++ b/reader.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "sync" "github.com/anacrolix/log" @@ -43,6 +44,10 @@ type reader struct { mu sync.Locker pos int64 readahead int64 + // Function to dynamically calculate readahead. If nil, readahead is static. + readaheadFunc func() int64 + // Position that reads have continued contiguously from. + contiguousReadStartPos int64 // The cached piece range this reader wants downloaded. The zero value corresponds to nothing. // We cache this so that changes can be detected, and bubbled up to the Torrent only as // required. @@ -65,6 +70,7 @@ func (r *reader) SetNonResponsive() { func (r *reader) SetReadahead(readahead int64) { r.mu.Lock() r.readahead = readahead + r.readaheadFunc = nil r.mu.Unlock() r.t.cl.lock() defer r.t.cl.unlock() @@ -100,6 +106,9 @@ func (r *reader) available(off, max int64) (ret int64) { // Calculates the pieces this reader wants downloaded, ignoring the cached value at r.pieces. func (r *reader) piecesUncached() (ret pieceRange) { ra := r.readahead + if r.readaheadFunc != nil { + ra = r.readaheadFunc() + } if ra < 1 { // Needs to be at least 1, because [x, x) means we don't want // anything. @@ -249,23 +258,31 @@ func (r *reader) posChanged() { r.t.readerPosChanged(from, to) } -func (r *reader) Seek(off int64, whence int) (ret int64, err error) { +func (r *reader) Seek(off int64, whence int) (newPos int64, err error) { r.opMu.Lock() defer r.opMu.Unlock() - r.mu.Lock() defer r.mu.Unlock() - switch whence { - case io.SeekStart: - r.pos = off - case io.SeekCurrent: - r.pos += off - case io.SeekEnd: - r.pos = r.length + off - default: - err = errors.New("bad whence") + newPos, err = func() (int64, error) { + switch whence { + case io.SeekStart: + return off, err + case io.SeekCurrent: + return r.pos + off, nil + case io.SeekEnd: + return r.length + off, nil + default: + return r.pos, errors.New("bad whence") + } + }() + if err != nil { + return } - ret = r.pos + if newPos == r.pos { + return + } + r.pos = newPos + r.contiguousReadStartPos = newPos r.posChanged() return @@ -274,3 +291,8 @@ func (r *reader) Seek(off int64, whence int) (ret int64, err error) { func (r *reader) log(m log.Msg) { r.t.logger.Log(m.Skip(1)) } + +// Implementation inspired from an arbitrary comment I found on HN. +func (r *reader) sqrtReadahead() int64 { + return int64(math.Sqrt(float64(r.pos - r.contiguousReadStartPos))) +} diff --git a/t.go b/t.go index 17082729..8da39883 100644 --- a/t.go +++ b/t.go @@ -37,12 +37,12 @@ func (t *Torrent) NewReader() Reader { func (t *Torrent) newReader(offset, length int64) Reader { r := reader{ - mu: t.cl.locker(), - t: t, - readahead: 5 * 1024 * 1024, - offset: offset, - length: length, + mu: t.cl.locker(), + t: t, + offset: offset, + length: length, } + r.readaheadFunc = r.sqrtReadahead t.addReader(&r) return &r } -- 2.44.0