From 589b1ebd9f3b8989537178ad4c762b49df64af4d Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 28 Apr 2025 11:36:11 +1000 Subject: [PATCH] Refactor Reader.readOnceAt Looks like there was some errors that could be dropped. We also handle a few corner cases and ReadAt returning 0 being handled non-canonically. --- bad_storage.go => bad-storage.go | 6 +- reader.go | 122 +++++++++++++++++++------------ storage/interface.go | 3 + 3 files changed, 83 insertions(+), 48 deletions(-) rename bad_storage.go => bad-storage.go (92%) diff --git a/bad_storage.go b/bad-storage.go similarity index 92% rename from bad_storage.go rename to bad-storage.go index 7a2537a6..cee76865 100644 --- a/bad_storage.go +++ b/bad-storage.go @@ -20,8 +20,12 @@ func (bs badStorage) OpenTorrent( *metainfo.Info, metainfo.Hash, ) (storage.TorrentImpl, error) { + capFunc := func() (cap int64, capped bool) { + return -1, true + } return storage.TorrentImpl{ - Piece: bs.Piece, + Piece: bs.Piece, + Capacity: &capFunc, }, nil } diff --git a/reader.go b/reader.go index 2af99ce2..f615d4fe 100644 --- a/reader.go +++ b/reader.go @@ -5,10 +5,12 @@ import ( "errors" "fmt" "io" + "log/slog" "sync" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2" + "github.com/anacrolix/missinggo/v2/panicif" ) // Accesses Torrent data via a Client. Reads block until the data is available. Seeks and readahead @@ -261,51 +263,73 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er err = io.EOF return } - for { - var avail int64 - avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0) - if avail == 0 { - return - } - firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength) - firstPieceOffset := r.torrentOffset(pos) % r.t.info.PieceLength - b1 := missinggo.LimitLen(b, avail) - n, err = r.t.readAt(b1, r.torrentOffset(pos)) - if n != 0 { - err = nil - return - } - if r.t.closed.IsSet() { - err = fmt.Errorf("reading from closed torrent: %w", err) - return - } - r.t.cl.lock() - // I think there's a panic here caused by the Client being closed before obtaining this - // lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI. - func() { - // Just add exceptions already. - defer r.t.cl.unlock() - if r.t.closed.IsSet() { - // Can't update because Torrent's piece order is removed from Client. - return - } - // TODO: Just reset pieces in the readahead window. This might help - // prevent thrashing with small caches and file and piece priorities. - r.log(log.Fstr("error reading piece %d offset %d, %d bytes: %v", - firstPieceIndex, firstPieceOffset, len(b1), err)) - if !r.t.updatePieceCompletion(firstPieceIndex) { - r.log(log.Fstr("piece %d completion unchanged", firstPieceIndex)) - } - // Update the rest of the piece completions in the readahead window, without alerting to - // changes (since only the first piece, the one above, could have generated the read error - // we're currently handling). - if r.pieces.begin != firstPieceIndex { - panic(fmt.Sprint(r.pieces.begin, firstPieceIndex)) - } - for index := r.pieces.begin + 1; index < r.pieces.end; index++ { - r.t.updatePieceCompletion(index) - } - }() + var avail int64 + avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0) + if avail == 0 || err != nil { + return + } + firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength) + firstPieceOffset := r.torrentOffset(pos) % r.t.info.PieceLength + b1 := b[:min(int64(len(b)), avail)] + // I think we can get EOF here due to the ReadAt contract. Previously we were forgetting to + // return an error so it wasn't noticed. We now try again if there's a storage cap otherwise + // convert it to io.UnexpectedEOF. + n, err = r.t.readAt(b1, r.torrentOffset(pos)) + if n != 0 { + err = nil + return + } + panicif.Nil(err) + if r.t.closed.IsSet() { + err = fmt.Errorf("reading from closed torrent: %w", err) + return + } + attrs := [...]any{ + "piece", firstPieceIndex, + "offset", firstPieceOffset, + "bytes", len(b1), + "err", err, + } + if r.t.hasStorageCap() { + r.slogger().Debug("error reading from capped storage", attrs[:]...) + } else { + r.slogger().Error("error reading", attrs[:]...) + } + r.afterReadError(firstPieceIndex) + if r.t.hasStorageCap() { + // Ensure params weren't modified (Go sux). Recurse to detect infinite loops. + return r.readOnceAt(ctx, b, pos) + } + // There should have been something available, avail != 0 here. + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return +} + +func (r *reader) afterReadError(firstPieceIndex int) { + r.t.cl.lock() + // I think there's a panic here caused by the Client being closed before obtaining this + // lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI. + // Just add exceptions already. + defer r.t.cl.unlock() + if r.t.closed.IsSet() { + // Can't update because Torrent's piece order is removed from Client. + return + } + // TODO: Just reset pieces in the readahead window. This might help + // prevent thrashing with small caches and file and piece priorities. + if !r.t.updatePieceCompletion(firstPieceIndex) { + r.logger().Levelf(log.Debug, "piece %d completion unchanged", firstPieceIndex) + } + // Update the rest of the piece completions in the readahead window, without alerting to + // changes (since only the first piece, the one above, could have generated the read error + // we're currently handling). + if r.pieces.begin != firstPieceIndex { + panic(fmt.Sprint(r.pieces.begin, firstPieceIndex)) + } + for index := r.pieces.begin + 1; index < r.pieces.end; index++ { + r.t.updatePieceCompletion(index) } } @@ -352,11 +376,15 @@ func (r *reader) Seek(off int64, whence int) (newPos int64, err error) { return } -func (r *reader) log(m log.Msg) { - r.t.logger.LogLevel(log.Debug, m.Skip(1)) +func (r *reader) logger() log.Logger { + return r.t.logger } // Implementation inspired by https://news.ycombinator.com/item?id=27019613. func defaultReadaheadFunc(r ReadaheadContext) int64 { return r.CurrentPos - r.ContiguousReadStartPos } + +func (r *reader) slogger() *slog.Logger { + return r.t.slogger() +} diff --git a/storage/interface.go b/storage/interface.go index f7ccd42f..ddae026d 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -19,6 +19,9 @@ type ClientImpl interface { OpenTorrent(ctx context.Context, info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) } +// Returning a negative cap, can we indicate there's no specific cap? If this is not-nil we use it +// as a key into piece request order. The capped bool also needs to be true to be truly capped +// though. type TorrentCapacity *func() (cap int64, capped bool) // Data storage bound to a torrent. -- 2.48.1