"errors"
"fmt"
"io"
+ "log/slog"
"sync"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2"
+ "github.com/anacrolix/missinggo/v2/panicif"
)
// Accesses Torrent data via a Client. Reads block until the data is available. Seeks and readahead
err = io.EOF
return
}
- for {
- var avail int64
- avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
- if avail == 0 {
- return
- }
- firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength)
- firstPieceOffset := r.torrentOffset(pos) % r.t.info.PieceLength
- b1 := missinggo.LimitLen(b, avail)
- n, err = r.t.readAt(b1, r.torrentOffset(pos))
- if n != 0 {
- err = nil
- return
- }
- if r.t.closed.IsSet() {
- err = fmt.Errorf("reading from closed torrent: %w", err)
- return
- }
- r.t.cl.lock()
- // I think there's a panic here caused by the Client being closed before obtaining this
- // lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI.
- func() {
- // Just add exceptions already.
- defer r.t.cl.unlock()
- if r.t.closed.IsSet() {
- // Can't update because Torrent's piece order is removed from Client.
- return
- }
- // TODO: Just reset pieces in the readahead window. This might help
- // prevent thrashing with small caches and file and piece priorities.
- r.log(log.Fstr("error reading piece %d offset %d, %d bytes: %v",
- firstPieceIndex, firstPieceOffset, len(b1), err))
- if !r.t.updatePieceCompletion(firstPieceIndex) {
- r.log(log.Fstr("piece %d completion unchanged", firstPieceIndex))
- }
- // Update the rest of the piece completions in the readahead window, without alerting to
- // changes (since only the first piece, the one above, could have generated the read error
- // we're currently handling).
- if r.pieces.begin != firstPieceIndex {
- panic(fmt.Sprint(r.pieces.begin, firstPieceIndex))
- }
- for index := r.pieces.begin + 1; index < r.pieces.end; index++ {
- r.t.updatePieceCompletion(index)
- }
- }()
+ var avail int64
+ avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
+ if avail == 0 || err != nil {
+ return
+ }
+ firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength)
+ firstPieceOffset := r.torrentOffset(pos) % r.t.info.PieceLength
+ b1 := b[:min(int64(len(b)), avail)]
+ // I think we can get EOF here due to the ReadAt contract. Previously we were forgetting to
+ // return an error so it wasn't noticed. We now try again if there's a storage cap otherwise
+ // convert it to io.UnexpectedEOF.
+ n, err = r.t.readAt(b1, r.torrentOffset(pos))
+ if n != 0 {
+ err = nil
+ return
+ }
+ panicif.Nil(err)
+ if r.t.closed.IsSet() {
+ err = fmt.Errorf("reading from closed torrent: %w", err)
+ return
+ }
+ attrs := [...]any{
+ "piece", firstPieceIndex,
+ "offset", firstPieceOffset,
+ "bytes", len(b1),
+ "err", err,
+ }
+ if r.t.hasStorageCap() {
+ r.slogger().Debug("error reading from capped storage", attrs[:]...)
+ } else {
+ r.slogger().Error("error reading", attrs[:]...)
+ }
+ r.afterReadError(firstPieceIndex)
+ if r.t.hasStorageCap() {
+ // Ensure params weren't modified (Go sux). Recurse to detect infinite loops.
+ return r.readOnceAt(ctx, b, pos)
+ }
+ // There should have been something available, avail != 0 here.
+ if err == io.EOF {
+ err = io.ErrUnexpectedEOF
+ }
+ return
+}
+
+func (r *reader) afterReadError(firstPieceIndex int) {
+ r.t.cl.lock()
+ // I think there's a panic here caused by the Client being closed before obtaining this
+ // lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI.
+ // Just add exceptions already.
+ defer r.t.cl.unlock()
+ if r.t.closed.IsSet() {
+ // Can't update because Torrent's piece order is removed from Client.
+ return
+ }
+ // TODO: Just reset pieces in the readahead window. This might help
+ // prevent thrashing with small caches and file and piece priorities.
+ if !r.t.updatePieceCompletion(firstPieceIndex) {
+ r.logger().Levelf(log.Debug, "piece %d completion unchanged", firstPieceIndex)
+ }
+ // Update the rest of the piece completions in the readahead window, without alerting to
+ // changes (since only the first piece, the one above, could have generated the read error
+ // we're currently handling).
+ if r.pieces.begin != firstPieceIndex {
+ panic(fmt.Sprint(r.pieces.begin, firstPieceIndex))
+ }
+ for index := r.pieces.begin + 1; index < r.pieces.end; index++ {
+ r.t.updatePieceCompletion(index)
}
}
return
}
-func (r *reader) log(m log.Msg) {
- r.t.logger.LogLevel(log.Debug, m.Skip(1))
+func (r *reader) logger() log.Logger {
+ return r.t.logger
}
// Implementation inspired by https://news.ycombinator.com/item?id=27019613.
func defaultReadaheadFunc(r ReadaheadContext) int64 {
return r.CurrentPos - r.ContiguousReadStartPos
}
+
+func (r *reader) slogger() *slog.Logger {
+ return r.t.slogger()
+}