From 644f8e53d300f9ede764a85d8187a930106c20a6 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 8 May 2025 16:01:43 +1000 Subject: [PATCH] Reinit storage reader on read errors --- client_test.go | 16 +++++------ reader.go | 78 +++++++++++++++++++++++++++++++++++++++++++------- t.go | 11 ++++--- torrent.go | 1 + 4 files changed, 82 insertions(+), 24 deletions(-) diff --git a/client_test.go b/client_test.go index 73ea3630..e1725f05 100644 --- a/client_test.go +++ b/client_test.go @@ -323,20 +323,20 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { return }()) leecherTorrent.AddClientPeer(seeder) - reader := leecherTorrent.NewReader() - t.Cleanup(func() { reader.Close() }) - reader.SetReadahead(0) - reader.SetResponsive() + rdr := leecherTorrent.NewReader() + t.Cleanup(func() { rdr.Close() }) + rdr.SetReadahead(0) + rdr.SetResponsive() b := make([]byte, 2) - _, err = reader.Seek(3, io.SeekStart) + _, err = rdr.Seek(3, io.SeekStart) require.NoError(t, err) - _, err = io.ReadFull(reader, b) + _, err = io.ReadFull(rdr, b) assert.Nil(t, err) assert.EqualValues(t, "lo", string(b)) - _, err = reader.Seek(11, io.SeekStart) + _, err = rdr.Seek(11, io.SeekStart) require.NoError(t, err) leecherTorrent.Drop() - n, err := reader.Read(b) + n, err := rdr.Read(b) qt.Assert(t, qt.Equals(err, errTorrentClosed)) assert.EqualValues(t, 0, n) } diff --git a/reader.go b/reader.go index ab6df459..ca5adebf 100644 --- a/reader.go +++ b/reader.go @@ -191,7 +191,7 @@ func (r *reader) readContext(ctx context.Context, b []byte) (n int, err error) { r.posChanged() r.mu.Unlock() } - n, err = r.readOnceAt(ctx, b, r.pos) + n, err = r.readAt(ctx, b, r.pos) if n == 0 { if err == nil && len(b) > 0 { panic("expected error") @@ -264,10 +264,6 @@ func (r *reader) torrentOffset(readerPos int64) int64 { // Performs at most one successful read to torrent storage. func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, err error) { - if pos >= r.length { - err = io.EOF - return - } var avail int64 avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0) if avail == 0 || err != nil { @@ -279,6 +275,7 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er // I think we can get EOF here due to the ReadAt contract. Previously we were forgetting to // return an error so it wasn't noticed. We now try again if there's a storage cap otherwise // convert it to io.UnexpectedEOF. + r.initStorageReader() n, err = r.storageReader.ReadAt(b1, r.torrentOffset(pos)) //n, err = r.t.readAt(b1, r.torrentOffset(pos)) if n != 0 { @@ -301,11 +298,53 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er } else { r.slogger().Error("error reading", attrs[:]...) } - r.afterReadError(firstPieceIndex) + return +} + +// Performs at most one successful read to torrent storage. Try reading, first with the storage +// reader we already have, then after resetting it (in case data moved for +// completed/incomplete/promoted etc.). Then try resetting the piece completions. Then after all +// that if the storage is supposed to be flaky, try all over again. TODO: Filter errors and set log +// levels appropriately. +func (r *reader) readAt(ctx context.Context, b []byte, pos int64) (n int, err error) { + if pos >= r.length { + err = io.EOF + return + } + n, err = r.readOnceAt(ctx, b, pos) + if err == nil { + return + } + r.slogger().Error("initial read failed", "err", err) + + err = r.clearStorageReader() + if err != nil { + err = fmt.Errorf("closing storage reader after first read failed: %w", err) + return + } + r.storageReader = nil + + n, err = r.readOnceAt(ctx, b, pos) + if err == nil { + return + } + r.slogger().Error("read failed after reader reset", "err", err) + + r.updatePieceCompletion(pos) + + n, err = r.readOnceAt(ctx, b, pos) + if err == nil { + return + } + r.slogger().Error("read failed after completion resync", "err", err) + if r.t.hasStorageCap() { - // Ensure params weren't modified (Go sux). Recurse to detect infinite loops. - return r.readOnceAt(ctx, b, pos) + // Ensure params weren't modified (Go sux). Recurse to detect infinite loops. TODO: I expect + // only some errors should pass through here, this might cause us to get stuck if we retry + // for any error. + return r.readAt(ctx, b, pos) } + // There should have been something available, avail != 0 here. if err == io.EOF { err = io.ErrUnexpectedEOF @@ -313,7 +352,9 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er return } -func (r *reader) afterReadError(firstPieceIndex int) { +// We pass pos in case we go ahead and implement multiple reads per ReadAt. +func (r *reader) updatePieceCompletion(pos int64) { + firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength) r.t.cl.lock() // I think there's a panic here caused by the Client being closed before obtaining this // lock. TestDropTorrentWithMmapStorageWhileHashing seems to tickle occasionally in CI. @@ -344,7 +385,7 @@ func (r *reader) Close() error { r.t.cl.lock() r.t.deleteReader(r) r.t.cl.unlock() - return r.storageReader.Close() + return r.clearStorageReader() } func (r *reader) posChanged() { @@ -394,3 +435,20 @@ func defaultReadaheadFunc(r ReadaheadContext) int64 { func (r *reader) slogger() *slog.Logger { return r.t.slogger() } + +func (r *reader) initStorageReader() { + if r.storageReader == nil { + r.storageReader = r.t.storageReader() + } +} + +func (r *reader) clearStorageReader() (err error) { + if r.storageReader != nil { + err = r.storageReader.Close() + if err != nil { + return + } + } + r.storageReader = nil + return +} diff --git a/t.go b/t.go index f862b553..8a07579f 100644 --- a/t.go +++ b/t.go @@ -40,12 +40,11 @@ func (t *Torrent) NewReader() Reader { func (t *Torrent) newReader(offset, length int64) Reader { r := reader{ - mu: t.cl.locker(), - t: t, - offset: offset, - length: length, - storageReader: t.storageReader(), - ctx: context.Background(), + mu: t.cl.locker(), + t: t, + offset: offset, + length: length, + ctx: context.Background(), } r.readaheadFunc = defaultReadaheadFunc t.addReader(&r) diff --git a/torrent.go b/torrent.go index d5605087..5e02c807 100644 --- a/torrent.go +++ b/torrent.go @@ -1656,6 +1656,7 @@ func (t *Torrent) openNewConns() (initiated int) { return } +// Pulls piece completion state from storage and performs any state updates if it changes. func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { p := t.piece(piece) uncached := t.pieceCompleteUncached(piece) -- 2.51.0