return
}())
leecherTorrent.AddClientPeer(seeder)
- reader := leecherTorrent.NewReader()
- t.Cleanup(func() { reader.Close() })
- reader.SetReadahead(0)
- reader.SetResponsive()
+ rdr := leecherTorrent.NewReader()
+ t.Cleanup(func() { rdr.Close() })
+ rdr.SetReadahead(0)
+ rdr.SetResponsive()
b := make([]byte, 2)
- _, err = reader.Seek(3, io.SeekStart)
+ _, err = rdr.Seek(3, io.SeekStart)
require.NoError(t, err)
- _, err = io.ReadFull(reader, b)
+ _, err = io.ReadFull(rdr, b)
assert.Nil(t, err)
assert.EqualValues(t, "lo", string(b))
- _, err = reader.Seek(11, io.SeekStart)
+ _, err = rdr.Seek(11, io.SeekStart)
require.NoError(t, err)
leecherTorrent.Drop()
- n, err := reader.Read(b)
+ n, err := rdr.Read(b)
qt.Assert(t, qt.Equals(err, errTorrentClosed))
assert.EqualValues(t, 0, n)
}
r.posChanged()
r.mu.Unlock()
}
- n, err = r.readOnceAt(ctx, b, r.pos)
+ n, err = r.readAt(ctx, b, r.pos)
if n == 0 {
if err == nil && len(b) > 0 {
panic("expected error")
// Performs at most one successful read to torrent storage.
func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, err error) {
- if pos >= r.length {
- err = io.EOF
- return
- }
var avail int64
avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
if avail == 0 || err != nil {
// I think we can get EOF here due to the ReadAt contract. Previously we were forgetting to
// return an error so it wasn't noticed. We now try again if there's a storage cap otherwise
// convert it to io.UnexpectedEOF.
+ r.initStorageReader()
n, err = r.storageReader.ReadAt(b1, r.torrentOffset(pos))
//n, err = r.t.readAt(b1, r.torrentOffset(pos))
if n != 0 {
} else {
r.slogger().Error("error reading", attrs[:]...)
}
- r.afterReadError(firstPieceIndex)
+ return
+}
+
+// Performs at most one successful read to torrent storage. Try reading, first with the storage
+// reader we already have, then after resetting it (in case data moved for
+// completed/incomplete/promoted etc.). Then try resetting the piece completions. Then after all
+// that if the storage is supposed to be flaky, try all over again. TODO: Filter errors and set log
+// levels appropriately.
+func (r *reader) readAt(ctx context.Context, b []byte, pos int64) (n int, err error) {
+ if pos >= r.length {
+ err = io.EOF
+ return
+ }
+ n, err = r.readOnceAt(ctx, b, pos)
+ if err == nil {
+ return
+ }
+ r.slogger().Error("initial read failed", "err", err)
+
+ err = r.clearStorageReader()
+ if err != nil {
+ err = fmt.Errorf("closing storage reader after first read failed: %w", err)
+ return
+ }
+ r.storageReader = nil
+
+ n, err = r.readOnceAt(ctx, b, pos)
+ if err == nil {
+ return
+ }
+ r.slogger().Error("read failed after reader reset", "err", err)
+
+ r.updatePieceCompletion(pos)
+
+ n, err = r.readOnceAt(ctx, b, pos)
+ if err == nil {
+ return
+ }
+ r.slogger().Error("read failed after completion resync", "err", err)
+
if r.t.hasStorageCap() {
- // Ensure params weren't modified (Go sux). Recurse to detect infinite loops.
- return r.readOnceAt(ctx, b, pos)
+ // Ensure params weren't modified (Go sux). Recurse to detect infinite loops. TODO: I expect
+ // only some errors should pass through here, this might cause us to get stuck if we retry
+ // for any error.
+ return r.readAt(ctx, b, pos)
}
+
// There should have been something available, avail != 0 here.
if err == io.EOF {
err = io.ErrUnexpectedEOF
return
}
-func (r *reader) afterReadError(firstPieceIndex int) {
+// We pass pos in case we go ahead and implement multiple reads per ReadAt.
+func (r *reader) updatePieceCompletion(pos int64) {
+ firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength)
r.t.cl.lock()
// I think there's a panic here caused by the Client being closed before obtaining this
// lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI.
r.t.cl.lock()
r.t.deleteReader(r)
r.t.cl.unlock()
- return r.storageReader.Close()
+ return r.clearStorageReader()
}
func (r *reader) posChanged() {
func (r *reader) slogger() *slog.Logger {
return r.t.slogger()
}
+
+func (r *reader) initStorageReader() {
+ if r.storageReader == nil {
+ r.storageReader = r.t.storageReader()
+ }
+}
+
+func (r *reader) clearStorageReader() (err error) {
+ if r.storageReader != nil {
+ err = r.storageReader.Close()
+ if err != nil {
+ return
+ }
+ }
+ r.storageReader = nil
+ return
+}