reader.go | 19 +++++++++++++++++-- diff --git a/reader.go b/reader.go index 78a2ff00bf2df4d86b48bb4ec3a6c8f2bc7f0166..0d39522f91bfd4e42b6acd3104e99f0a8943cb5b 100644 --- a/reader.go +++ b/reader.go @@ -40,8 +40,11 @@ // Ensure operations that change the position are exclusive, like Read() and Seek(). opMu sync.Mutex // Required when modifying pos and readahead, or reading them without opMu. - mu sync.Locker - pos int64 + mu sync.Locker + pos int64 + // Reads have been initiated since the last seek. This is used to prevent readahead occuring + // after a seek or with a new reader at the starting position. + reading bool readahead int64 // Function to dynamically calculate readahead. If nil, readahead is static. readaheadFunc func() int64 @@ -113,6 +116,9 @@ // Needs to be at least 1, because [x, x) means we don't want // anything. ra = 1 } + if !r.reading { + ra = 0 + } if ra > r.length-r.pos { ra = r.length - r.pos } @@ -129,6 +135,14 @@ // Hmmm, if a Read gets stuck, this means you can't change position for other purposes. That // seems reasonable, but unusual. r.opMu.Lock() defer r.opMu.Unlock() + if len(b) > 0 { + r.reading = true + // TODO: Rework reader piece priorities so we don't have to push updates in to the Client + // and take the lock here. + r.mu.Lock() + r.posChanged() + r.mu.Unlock() + } n, err = r.readOnceAt(ctx, b, r.pos) if n == 0 { if err == nil && len(b) > 0 { @@ -280,6 +294,7 @@ } if newPos == r.pos { return } + r.reading = false r.pos = newPos r.contiguousReadStartPos = newPos