]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Refactor Reader.readOnceAt
authorMatt Joiner <anacrolix@gmail.com>
Mon, 28 Apr 2025 01:36:11 +0000 (11:36 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 28 Apr 2025 01:36:11 +0000 (11:36 +1000)
Looks like there was some errors that could be dropped.
We also handle a few corner cases and ReadAt returning 0 being handled non-canonically.

bad-storage.go [moved from bad_storage.go with 92% similarity]
reader.go
storage/interface.go

similarity index 92%
rename from bad_storage.go
rename to bad-storage.go
index 7a2537a6147603b79a808228883d3db8b6a61d65..cee768651b7a2fbd4a887bb71f1f475f6feab92e 100644 (file)
@@ -20,8 +20,12 @@ func (bs badStorage) OpenTorrent(
        *metainfo.Info,
        metainfo.Hash,
 ) (storage.TorrentImpl, error) {
+       capFunc := func() (cap int64, capped bool) {
+               return -1, true
+       }
        return storage.TorrentImpl{
-               Piece: bs.Piece,
+               Piece:    bs.Piece,
+               Capacity: &capFunc,
        }, nil
 }
 
index 2af99ce2664227af0f04a1ab4bc226547a81403b..f615d4fe8ecd33e6537606ffab0be86502d77eb6 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -5,10 +5,12 @@ import (
        "errors"
        "fmt"
        "io"
+       "log/slog"
        "sync"
 
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2"
+       "github.com/anacrolix/missinggo/v2/panicif"
 )
 
 // Accesses Torrent data via a Client. Reads block until the data is available. Seeks and readahead
@@ -261,51 +263,73 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er
                err = io.EOF
                return
        }
-       for {
-               var avail int64
-               avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
-               if avail == 0 {
-                       return
-               }
-               firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength)
-               firstPieceOffset := r.torrentOffset(pos) % r.t.info.PieceLength
-               b1 := missinggo.LimitLen(b, avail)
-               n, err = r.t.readAt(b1, r.torrentOffset(pos))
-               if n != 0 {
-                       err = nil
-                       return
-               }
-               if r.t.closed.IsSet() {
-                       err = fmt.Errorf("reading from closed torrent: %w", err)
-                       return
-               }
-               r.t.cl.lock()
-               // I think there's a panic here caused by the Client being closed before obtaining this
-               // lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI.
-               func() {
-                       // Just add exceptions already.
-                       defer r.t.cl.unlock()
-                       if r.t.closed.IsSet() {
-                               // Can't update because Torrent's piece order is removed from Client.
-                               return
-                       }
-                       // TODO: Just reset pieces in the readahead window. This might help
-                       // prevent thrashing with small caches and file and piece priorities.
-                       r.log(log.Fstr("error reading piece %d offset %d, %d bytes: %v",
-                               firstPieceIndex, firstPieceOffset, len(b1), err))
-                       if !r.t.updatePieceCompletion(firstPieceIndex) {
-                               r.log(log.Fstr("piece %d completion unchanged", firstPieceIndex))
-                       }
-                       // Update the rest of the piece completions in the readahead window, without alerting to
-                       // changes (since only the first piece, the one above, could have generated the read error
-                       // we're currently handling).
-                       if r.pieces.begin != firstPieceIndex {
-                               panic(fmt.Sprint(r.pieces.begin, firstPieceIndex))
-                       }
-                       for index := r.pieces.begin + 1; index < r.pieces.end; index++ {
-                               r.t.updatePieceCompletion(index)
-                       }
-               }()
+       var avail int64
+       avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
+       if avail == 0 || err != nil {
+               return
+       }
+       firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength)
+       firstPieceOffset := r.torrentOffset(pos) % r.t.info.PieceLength
+       b1 := b[:min(int64(len(b)), avail)]
+       // I think we can get EOF here due to the ReadAt contract. Previously we were forgetting to
+       // return an error so it wasn't noticed. We now try again if there's a storage cap otherwise
+       // convert it to io.UnexpectedEOF.
+       n, err = r.t.readAt(b1, r.torrentOffset(pos))
+       if n != 0 {
+               err = nil
+               return
+       }
+       panicif.Nil(err)
+       if r.t.closed.IsSet() {
+               err = fmt.Errorf("reading from closed torrent: %w", err)
+               return
+       }
+       attrs := [...]any{
+               "piece", firstPieceIndex,
+               "offset", firstPieceOffset,
+               "bytes", len(b1),
+               "err", err,
+       }
+       if r.t.hasStorageCap() {
+               r.slogger().Debug("error reading from capped storage", attrs[:]...)
+       } else {
+               r.slogger().Error("error reading", attrs[:]...)
+       }
+       r.afterReadError(firstPieceIndex)
+       if r.t.hasStorageCap() {
+               // Ensure params weren't modified (Go sux). Recurse to detect infinite loops.
+               return r.readOnceAt(ctx, b, pos)
+       }
+       // There should have been something available, avail != 0 here.
+       if err == io.EOF {
+               err = io.ErrUnexpectedEOF
+       }
+       return
+}
+
+func (r *reader) afterReadError(firstPieceIndex int) {
+       r.t.cl.lock()
+       // I think there's a panic here caused by the Client being closed before obtaining this
+       // lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI.
+       // Just add exceptions already.
+       defer r.t.cl.unlock()
+       if r.t.closed.IsSet() {
+               // Can't update because Torrent's piece order is removed from Client.
+               return
+       }
+       // TODO: Just reset pieces in the readahead window. This might help
+       // prevent thrashing with small caches and file and piece priorities.
+       if !r.t.updatePieceCompletion(firstPieceIndex) {
+               r.logger().Levelf(log.Debug, "piece %d completion unchanged", firstPieceIndex)
+       }
+       // Update the rest of the piece completions in the readahead window, without alerting to
+       // changes (since only the first piece, the one above, could have generated the read error
+       // we're currently handling).
+       if r.pieces.begin != firstPieceIndex {
+               panic(fmt.Sprint(r.pieces.begin, firstPieceIndex))
+       }
+       for index := r.pieces.begin + 1; index < r.pieces.end; index++ {
+               r.t.updatePieceCompletion(index)
        }
 }
 
@@ -352,11 +376,15 @@ func (r *reader) Seek(off int64, whence int) (newPos int64, err error) {
        return
 }
 
-func (r *reader) log(m log.Msg) {
-       r.t.logger.LogLevel(log.Debug, m.Skip(1))
+func (r *reader) logger() log.Logger {
+       return r.t.logger
 }
 
 // Implementation inspired by https://news.ycombinator.com/item?id=27019613.
 func defaultReadaheadFunc(r ReadaheadContext) int64 {
        return r.CurrentPos - r.ContiguousReadStartPos
 }
+
+func (r *reader) slogger() *slog.Logger {
+       return r.t.slogger()
+}
index f7ccd42f0c68985f96c5f4c8f31a1fd95b8d6321..ddae026d7eb87a9b893d7231c0e03592cbc1196f 100644 (file)
@@ -19,6 +19,9 @@ type ClientImpl interface {
        OpenTorrent(ctx context.Context, info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error)
 }
 
+// Returning a negative cap, can we indicate there's no specific cap? If this is not-nil we use it
+// as a key into piece request order. The capped bool also needs to be true to be truly capped
+// though.
 type TorrentCapacity *func() (cap int64, capped bool)
 
 // Data storage bound to a torrent.