From 33c4b9d5bcb3417779a55701e58aeaab84d4e2dd Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 28 Apr 2025 11:18:43 +1000 Subject: [PATCH] Work towards allowing optimized torrent storage readers --- reader.go | 4 +- storage.go | 64 ++++++++++++++++++++++ storage/interface.go | 12 ++++ storage/piece-resource.go | 58 ++++++++++++++++---- storage/possum/possum-provider.go | 91 +++++++++++++++++++++++++++++-- storage/torrent-reader.go | 10 ++++ t.go | 11 ++-- torrent.go | 17 ++---- tracker_scraper.go | 1 + 9 files changed, 236 insertions(+), 32 deletions(-) create mode 100644 storage.go create mode 100644 storage/torrent-reader.go diff --git a/reader.go b/reader.go index f615d4fe..5630c424 100644 --- a/reader.go +++ b/reader.go @@ -57,6 +57,8 @@ type reader struct { // Adjust the read/seek window to handle Readers locked to File extents and the like. offset, length int64 + storageReader storageReader + // Function to dynamically calculate readahead. If nil, readahead is static. readaheadFunc ReadaheadFunc @@ -338,7 +340,7 @@ func (r *reader) Close() error { r.t.cl.lock() r.t.deleteReader(r) r.t.cl.unlock() - return nil + return r.storageReader.Close() } func (r *reader) posChanged() { diff --git a/storage.go b/storage.go new file mode 100644 index 00000000..40afbb87 --- /dev/null +++ b/storage.go @@ -0,0 +1,64 @@ +package torrent + +import ( + "io" + + "github.com/anacrolix/missinggo/v2/panicif" + + "github.com/anacrolix/torrent/storage" +) + +func (t *Torrent) storageReader() storageReader { + if t.storage.NewReader == nil { + return storagePieceReader{t: t} + } + return torrentStorageImplReader{ + implReader: t.storage.NewReader(), + t: t, + } +} + +type storagePieceReader struct { + t *Torrent +} + +func (me storagePieceReader) Close() error { + return nil +} + +func (me storagePieceReader) ReadAt(b []byte, off int64) (n int, err error) { + for len(b) != 0 { + p := me.t.pieceForOffset(off) + p.waitNoPendingWrites() + var n1 int + n1, err = p.Storage().ReadAt(b, off-p.Info().Offset()) + if n1 == 0 { + panicif.Nil(err) + break + } + off += int64(n1) + n += n1 + b = b[n1:] + } + return +} + +type storageReader interface { + io.ReaderAt + io.Closer +} + +type torrentStorageImplReader struct { + implReader storage.TorrentReader + t *Torrent +} + +func (me torrentStorageImplReader) ReadAt(p []byte, off int64) (n int, err error) { + // TODO: Should waitNoPendingWrites take a region? + me.t.pieceForOffset(off).waitNoPendingWrites() + return me.implReader.ReadAt(p, off) +} + +func (me torrentStorageImplReader) Close() error { + return me.implReader.Close() +} diff --git a/storage/interface.go b/storage/interface.go index edc2e9f3..3ef528b2 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -37,6 +37,8 @@ type TorrentImpl struct { // to determine the storage for torrents sharing the same function pointer, and mutated in // place. Capacity TorrentCapacity + + NewReader func() TorrentReader } // Interacts with torrent piece data. Optional interfaces to implement include: @@ -73,3 +75,13 @@ type Completion struct { type SelfHashing interface { SelfHash() (metainfo.Hash, error) } + +// Piece supports dedicated reader. +type PieceReaderer interface { + NewReader() PieceReader +} + +type PieceReader interface { + io.ReaderAt + io.Closer +} diff --git a/storage/piece-resource.go b/storage/piece-resource.go index 4ab8c07e..d7538a24 100644 --- a/storage/piece-resource.go +++ b/storage/piece-resource.go @@ -88,6 +88,10 @@ type ConsecutiveChunkReader interface { ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) } +type ChunksReaderer interface { + ChunksReader(prefix string) (PieceReader, error) +} + type PrefixDeleter interface { DeletePrefix(prefix string) error } @@ -238,17 +242,14 @@ func (s piecePerResourcePiece) MarkNotComplete() error { return s.completedInstance().Delete() } -func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) { - s.mu.RLock() - defer s.mu.RUnlock() - if s.mustIsComplete() { - if s.hasMovePrefix() { - chunks := s.getChunks(s.completedDirPath()) - return chunks.ReadAt(b, off) - } - return s.completedInstance().ReadAt(b, off) +func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) { + r, err := s.PieceReader() + if err != nil { + return } - return s.getChunks(s.incompleteDirPath()).ReadAt(b, off) + defer r.Close() + n, err = r.ReadAt(b, off) + return } func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) { @@ -377,3 +378,40 @@ func (me piecePerResourcePiece) hasMovePrefix() bool { _, ok := me.rp.(MovePrefixer) return ok } + +func (s piecePerResourcePiece) getChunksReader(prefix string) (PieceReader, error) { + if opt, ok := s.rp.(ChunksReaderer); ok { + return opt.ChunksReader(prefix) + } + return chunkPieceReader{s.getChunks(prefix)}, nil +} + +func (s piecePerResourcePiece) PieceReader() (PieceReader, error) { + s.mu.RLock() + defer s.mu.RUnlock() + if s.mustIsComplete() { + if s.hasMovePrefix() { + return s.getChunksReader(s.completedDirPath()) + } + return instancePieceReader{s.completedInstance()}, nil + } + return s.getChunksReader(s.incompleteDirPath()) +} + +type instancePieceReader struct { + resource.Instance +} + +func (instancePieceReader) Close() error { + return nil +} + +type chunkPieceReader struct { + chunks +} + +func (chunkPieceReader) Close() error { + return nil +} + +// TODO: Make an embedded Closer using reflect? diff --git a/storage/possum/possum-provider.go b/storage/possum/possum-provider.go index 12dc83d4..b30949e6 100644 --- a/storage/possum/possum-provider.go +++ b/storage/possum/possum-provider.go @@ -16,15 +16,98 @@ import ( ) // Extends possum resource.Provider with an efficient implementation of torrent -// storage.ConsecutiveChunkReader. TODO: This doesn't expose Capacity. TODO: Add a MarkComplete -// method that renames incomplete chunks rather than writing them to a single giant key and deleting -// them. +// storage.ConsecutiveChunkReader. TODO: This doesn't expose Capacity. type Provider struct { possumResource.Provider Logger log.Logger } -var _ storage.ConsecutiveChunkReader = Provider{} +var _ interface { + storage.ConsecutiveChunkReader + storage.ChunksReaderer +} = Provider{} + +type chunkReader struct { + r possum.Reader + values []consecutiveValue + keys []int64 +} + +func (c chunkReader) ReadAt(p []byte, off int64) (n int, err error) { + //TODO implement me + panic("implement me") +} + +func (c chunkReader) Close() error { + //TODO implement me + panic("implement me") +} + +type ChunkReader interface { + io.ReaderAt + io.Closer +} + +// TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid +// trying to read discontinuous or incomplete sequences of chunks? +func (p Provider) ChunksReader(prefix string) (ret storage.PieceReader, err error) { + p.Logger.Levelf(log.Debug, "ChunkReader(%q)", prefix) + //debug.PrintStack() + pr, err := p.Handle.NewReader() + if err != nil { + return + } + defer func() { + if err != nil { + pr.End() + } + }() + items, err := pr.ListItems(prefix) + if err != nil { + return + } + keys := make([]int64, 0, len(items)) + for _, item := range items { + var i int64 + offsetStr := item.Key + i, err = strconv.ParseInt(offsetStr, 10, 64) + if err != nil { + err = fmt.Errorf("failed to parse offset %q: %w", offsetStr, err) + return + } + keys = append(keys, i) + } + sort.Sort(keySorter[possum.Item, int64]{items, keys}) + offset := int64(0) + consValues := make([]consecutiveValue, 0, len(items)) + for i, item := range items { + itemOffset := keys[i] + if itemOffset+item.Stat.Size() <= offset { + // This item isn't needed + continue + } + var v possum.Value + v, err = pr.Add(prefix + item.Key) + if err != nil { + return + } + consValues = append(consValues, consecutiveValue{ + pv: v, + offset: itemOffset, + size: item.Stat.Size(), + }) + offset = itemOffset + item.Stat.Size() + } + err = pr.Begin() + if err != nil { + return + } + ret = chunkReader{ + r: pr, + values: consValues, + } + return +} // TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid // trying to read discontinuous or incomplete sequences of chunks? diff --git a/storage/torrent-reader.go b/storage/torrent-reader.go new file mode 100644 index 00000000..c42fdf72 --- /dev/null +++ b/storage/torrent-reader.go @@ -0,0 +1,10 @@ +package storage + +import ( + "io" +) + +type TorrentReader interface { + io.ReaderAt + io.Closer +} diff --git a/t.go b/t.go index 8a07579f..f862b553 100644 --- a/t.go +++ b/t.go @@ -40,11 +40,12 @@ func (t *Torrent) NewReader() Reader { func (t *Torrent) newReader(offset, length int64) Reader { r := reader{ - mu: t.cl.locker(), - t: t, - offset: offset, - length: length, - ctx: context.Background(), + mu: t.cl.locker(), + t: t, + offset: offset, + length: length, + storageReader: t.storageReader(), + ctx: context.Background(), } r.readaheadFunc = defaultReadaheadFunc t.addReader(&r) diff --git a/torrent.go b/torrent.go index 69b2e72f..f8254dbd 100644 --- a/torrent.go +++ b/torrent.go @@ -30,6 +30,7 @@ import ( "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/missinggo/v2/pubsub" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" @@ -1546,6 +1547,7 @@ func (t *Torrent) logPieceRequestOrder() { // Returns the range of pieces [begin, end) that contains the extent of bytes. func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) { + fmt.Println("byteRegionPieces", off, size) if off >= t.length() { return } @@ -1687,18 +1689,9 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { // Non-blocking read. Client lock is not required. func (t *Torrent) readAt(b []byte, off int64) (n int, err error) { - for len(b) != 0 { - p := &t.pieces[off/t.info.PieceLength] - p.waitNoPendingWrites() - var n1 int - n1, err = p.Storage().ReadAt(b, off-p.Info().Offset()) - if n1 == 0 { - break - } - off += int64(n1) - n += n1 - b = b[n1:] - } + r := t.storageReader() + n, err = r.ReadAt(b, off) + panicif.Err(r.Close()) return } diff --git a/tracker_scraper.go b/tracker_scraper.go index b8d72e98..3e95917e 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -12,6 +12,7 @@ import ( "time" "github.com/anacrolix/dht/v2/krpc" + "github.com/anacrolix/torrent/tracker" ) -- 2.51.0