]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Reinit storage reader on read errors
authorMatt Joiner <anacrolix@gmail.com>
Thu, 8 May 2025 06:01:43 +0000 (16:01 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 8 May 2025 06:01:43 +0000 (16:01 +1000)
client_test.go
reader.go
t.go
torrent.go

index 73ea3630c4b481cc53dbc0fb98b5c3a5e815b15f..e1725f0515cacc85803717f86501e2396d04c449 100644 (file)
@@ -323,20 +323,20 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
                return
        }())
        leecherTorrent.AddClientPeer(seeder)
-       reader := leecherTorrent.NewReader()
-       t.Cleanup(func() { reader.Close() })
-       reader.SetReadahead(0)
-       reader.SetResponsive()
+       rdr := leecherTorrent.NewReader()
+       t.Cleanup(func() { rdr.Close() })
+       rdr.SetReadahead(0)
+       rdr.SetResponsive()
        b := make([]byte, 2)
-       _, err = reader.Seek(3, io.SeekStart)
+       _, err = rdr.Seek(3, io.SeekStart)
        require.NoError(t, err)
-       _, err = io.ReadFull(reader, b)
+       _, err = io.ReadFull(rdr, b)
        assert.Nil(t, err)
        assert.EqualValues(t, "lo", string(b))
-       _, err = reader.Seek(11, io.SeekStart)
+       _, err = rdr.Seek(11, io.SeekStart)
        require.NoError(t, err)
        leecherTorrent.Drop()
-       n, err := reader.Read(b)
+       n, err := rdr.Read(b)
        qt.Assert(t, qt.Equals(err, errTorrentClosed))
        assert.EqualValues(t, 0, n)
 }
index ab6df459f1e2aa3885dfbd63037d9f15b5f9d930..ca5adebf359d0dbc97ff80100af8743c43a9a0d2 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -191,7 +191,7 @@ func (r *reader) readContext(ctx context.Context, b []byte) (n int, err error) {
                r.posChanged()
                r.mu.Unlock()
        }
-       n, err = r.readOnceAt(ctx, b, r.pos)
+       n, err = r.readAt(ctx, b, r.pos)
        if n == 0 {
                if err == nil && len(b) > 0 {
                        panic("expected error")
@@ -264,10 +264,6 @@ func (r *reader) torrentOffset(readerPos int64) int64 {
 
 // Performs at most one successful read to torrent storage.
 func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, err error) {
-       if pos >= r.length {
-               err = io.EOF
-               return
-       }
        var avail int64
        avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
        if avail == 0 || err != nil {
@@ -279,6 +275,7 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er
        // I think we can get EOF here due to the ReadAt contract. Previously we were forgetting to
        // return an error so it wasn't noticed. We now try again if there's a storage cap otherwise
        // convert it to io.UnexpectedEOF.
+       r.initStorageReader()
        n, err = r.storageReader.ReadAt(b1, r.torrentOffset(pos))
        //n, err = r.t.readAt(b1, r.torrentOffset(pos))
        if n != 0 {
@@ -301,11 +298,53 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er
        } else {
                r.slogger().Error("error reading", attrs[:]...)
        }
-       r.afterReadError(firstPieceIndex)
+       return
+}
+
+// Performs at most one successful read to torrent storage. Try reading, first with the storage
+// reader we already have, then after resetting it (in case data moved for
+// completed/incomplete/promoted etc.). Then try resetting the piece completions. Then after all
+// that if the storage is supposed to be flaky, try all over again. TODO: Filter errors and set log
+// levels appropriately.
+func (r *reader) readAt(ctx context.Context, b []byte, pos int64) (n int, err error) {
+       if pos >= r.length {
+               err = io.EOF
+               return
+       }
+       n, err = r.readOnceAt(ctx, b, pos)
+       if err == nil {
+               return
+       }
+       r.slogger().Error("initial read failed", "err", err)
+
+       err = r.clearStorageReader()
+       if err != nil {
+               err = fmt.Errorf("closing storage reader after first read failed: %w", err)
+               return
+       }
+       r.storageReader = nil
+
+       n, err = r.readOnceAt(ctx, b, pos)
+       if err == nil {
+               return
+       }
+       r.slogger().Error("read failed after reader reset", "err", err)
+
+       r.updatePieceCompletion(pos)
+
+       n, err = r.readOnceAt(ctx, b, pos)
+       if err == nil {
+               return
+       }
+       r.slogger().Error("read failed after completion resync", "err", err)
+
        if r.t.hasStorageCap() {
-               // Ensure params weren't modified (Go sux). Recurse to detect infinite loops.
-               return r.readOnceAt(ctx, b, pos)
+               // Ensure params weren't modified (Go sux). Recurse to detect infinite loops. TODO: I expect
+               // only some errors should pass through here, this might cause us to get stuck if we retry
+               // for any error.
+               return r.readAt(ctx, b, pos)
        }
+
        // There should have been something available, avail != 0 here.
        if err == io.EOF {
                err = io.ErrUnexpectedEOF
@@ -313,7 +352,9 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er
        return
 }
 
-func (r *reader) afterReadError(firstPieceIndex int) {
+// We pass pos in case we go ahead and implement multiple reads per ReadAt.
+func (r *reader) updatePieceCompletion(pos int64) {
+       firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength)
        r.t.cl.lock()
        // I think there's a panic here caused by the Client being closed before obtaining this
        // lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI.
@@ -344,7 +385,7 @@ func (r *reader) Close() error {
        r.t.cl.lock()
        r.t.deleteReader(r)
        r.t.cl.unlock()
-       return r.storageReader.Close()
+       return r.clearStorageReader()
 }
 
 func (r *reader) posChanged() {
@@ -394,3 +435,20 @@ func defaultReadaheadFunc(r ReadaheadContext) int64 {
 func (r *reader) slogger() *slog.Logger {
        return r.t.slogger()
 }
+
+func (r *reader) initStorageReader() {
+       if r.storageReader == nil {
+               r.storageReader = r.t.storageReader()
+       }
+}
+
+func (r *reader) clearStorageReader() (err error) {
+       if r.storageReader != nil {
+               err = r.storageReader.Close()
+               if err != nil {
+                       return
+               }
+       }
+       r.storageReader = nil
+       return
+}
diff --git a/t.go b/t.go
index f862b553b4af30c02065f295b0f062249d544237..8a07579f403823de3c59eb99f5f0ed1d70575af0 100644 (file)
--- a/t.go
+++ b/t.go
@@ -40,12 +40,11 @@ func (t *Torrent) NewReader() Reader {
 
 func (t *Torrent) newReader(offset, length int64) Reader {
        r := reader{
-               mu:            t.cl.locker(),
-               t:             t,
-               offset:        offset,
-               length:        length,
-               storageReader: t.storageReader(),
-               ctx:           context.Background(),
+               mu:     t.cl.locker(),
+               t:      t,
+               offset: offset,
+               length: length,
+               ctx:    context.Background(),
        }
        r.readaheadFunc = defaultReadaheadFunc
        t.addReader(&r)
index d56050878091cc2b5dce95aa0c77b65d6054ff0b..5e02c807e11d22919952de3afd74171b23eb7227 100644 (file)
@@ -1656,6 +1656,7 @@ func (t *Torrent) openNewConns() (initiated int) {
        return
 }
 
+// Pulls piece completion state from storage and performs any state updates if it changes.
 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
        p := t.piece(piece)
        uncached := t.pieceCompleteUncached(piece)