]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Work towards allowing optimized torrent storage readers
authorMatt Joiner <anacrolix@gmail.com>
Mon, 28 Apr 2025 01:18:43 +0000 (11:18 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 7 May 2025 09:49:10 +0000 (19:49 +1000)
reader.go
storage.go [new file with mode: 0644]
storage/interface.go
storage/piece-resource.go
storage/possum/possum-provider.go
storage/torrent-reader.go [new file with mode: 0644]
t.go
torrent.go
tracker_scraper.go

index f615d4fe8ecd33e6537606ffab0be86502d77eb6..5630c424284f2e20c7f136762de733fc5b4e7af7 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -57,6 +57,8 @@ type reader struct {
        // Adjust the read/seek window to handle Readers locked to File extents and the like.
        offset, length int64
 
+       storageReader storageReader
+
        // Function to dynamically calculate readahead. If nil, readahead is static.
        readaheadFunc ReadaheadFunc
 
@@ -338,7 +340,7 @@ func (r *reader) Close() error {
        r.t.cl.lock()
        r.t.deleteReader(r)
        r.t.cl.unlock()
-       return nil
+       return r.storageReader.Close()
 }
 
 func (r *reader) posChanged() {
diff --git a/storage.go b/storage.go
new file mode 100644 (file)
index 0000000..40afbb8
--- /dev/null
@@ -0,0 +1,64 @@
+package torrent
+
+import (
+       "io"
+
+       "github.com/anacrolix/missinggo/v2/panicif"
+
+       "github.com/anacrolix/torrent/storage"
+)
+
+func (t *Torrent) storageReader() storageReader {
+       if t.storage.NewReader == nil {
+               return storagePieceReader{t: t}
+       }
+       return torrentStorageImplReader{
+               implReader: t.storage.NewReader(),
+               t:          t,
+       }
+}
+
+type storagePieceReader struct {
+       t *Torrent
+}
+
+func (me storagePieceReader) Close() error {
+       return nil
+}
+
+func (me storagePieceReader) ReadAt(b []byte, off int64) (n int, err error) {
+       for len(b) != 0 {
+               p := me.t.pieceForOffset(off)
+               p.waitNoPendingWrites()
+               var n1 int
+               n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
+               if n1 == 0 {
+                       panicif.Nil(err)
+                       break
+               }
+               off += int64(n1)
+               n += n1
+               b = b[n1:]
+       }
+       return
+}
+
+type storageReader interface {
+       io.ReaderAt
+       io.Closer
+}
+
+type torrentStorageImplReader struct {
+       implReader storage.TorrentReader
+       t          *Torrent
+}
+
+func (me torrentStorageImplReader) ReadAt(p []byte, off int64) (n int, err error) {
+       // TODO: Should waitNoPendingWrites take a region?
+       me.t.pieceForOffset(off).waitNoPendingWrites()
+       return me.implReader.ReadAt(p, off)
+}
+
+func (me torrentStorageImplReader) Close() error {
+       return me.implReader.Close()
+}
index edc2e9f32082b90e1cdfbe8ea5489c4a73976037..3ef528b27903517bfd523ff93fc809b26d7c23a0 100644 (file)
@@ -37,6 +37,8 @@ type TorrentImpl struct {
        // to determine the storage for torrents sharing the same function pointer, and mutated in
        // place.
        Capacity TorrentCapacity
+
+       NewReader func() TorrentReader
 }
 
 // Interacts with torrent piece data. Optional interfaces to implement include:
@@ -73,3 +75,13 @@ type Completion struct {
 type SelfHashing interface {
        SelfHash() (metainfo.Hash, error)
 }
+
+// Piece supports dedicated reader.
+type PieceReaderer interface {
+       NewReader() PieceReader
+}
+
+type PieceReader interface {
+       io.ReaderAt
+       io.Closer
+}
index 4ab8c07eddae01f132eeadf564b2d21b768b7008..d7538a24c8e89b4296391f9af80b61ec1b8a40e0 100644 (file)
@@ -88,6 +88,10 @@ type ConsecutiveChunkReader interface {
        ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
 }
 
+type ChunksReaderer interface {
+       ChunksReader(prefix string) (PieceReader, error)
+}
+
 type PrefixDeleter interface {
        DeletePrefix(prefix string) error
 }
@@ -238,17 +242,14 @@ func (s piecePerResourcePiece) MarkNotComplete() error {
        return s.completedInstance().Delete()
 }
 
-func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
-       s.mu.RLock()
-       defer s.mu.RUnlock()
-       if s.mustIsComplete() {
-               if s.hasMovePrefix() {
-                       chunks := s.getChunks(s.completedDirPath())
-                       return chunks.ReadAt(b, off)
-               }
-               return s.completedInstance().ReadAt(b, off)
+func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) {
+       r, err := s.PieceReader()
+       if err != nil {
+               return
        }
-       return s.getChunks(s.incompleteDirPath()).ReadAt(b, off)
+       defer r.Close()
+       n, err = r.ReadAt(b, off)
+       return
 }
 
 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
@@ -377,3 +378,40 @@ func (me piecePerResourcePiece) hasMovePrefix() bool {
        _, ok := me.rp.(MovePrefixer)
        return ok
 }
+
+func (s piecePerResourcePiece) getChunksReader(prefix string) (PieceReader, error) {
+       if opt, ok := s.rp.(ChunksReaderer); ok {
+               return opt.ChunksReader(prefix)
+       }
+       return chunkPieceReader{s.getChunks(prefix)}, nil
+}
+
+func (s piecePerResourcePiece) PieceReader() (PieceReader, error) {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+       if s.mustIsComplete() {
+               if s.hasMovePrefix() {
+                       return s.getChunksReader(s.completedDirPath())
+               }
+               return instancePieceReader{s.completedInstance()}, nil
+       }
+       return s.getChunksReader(s.incompleteDirPath())
+}
+
+type instancePieceReader struct {
+       resource.Instance
+}
+
+func (instancePieceReader) Close() error {
+       return nil
+}
+
+type chunkPieceReader struct {
+       chunks
+}
+
+func (chunkPieceReader) Close() error {
+       return nil
+}
+
+// TODO: Make an embedded Closer using reflect?
index 12dc83d4b1a120c4ced112143f0993e973ea5f81..b30949e60e947711203361e49229945809ea315a 100644 (file)
@@ -16,15 +16,98 @@ import (
 )
 
 // Extends possum resource.Provider with an efficient implementation of torrent
-// storage.ConsecutiveChunkReader. TODO: This doesn't expose Capacity. TODO: Add a MarkComplete
-// method that renames incomplete chunks rather than writing them to a single giant key and deleting
-// them.
+// storage.ConsecutiveChunkReader. TODO: This doesn't expose Capacity.
 type Provider struct {
        possumResource.Provider
        Logger log.Logger
 }
 
-var _ storage.ConsecutiveChunkReader = Provider{}
+var _ interface {
+       storage.ConsecutiveChunkReader
+       storage.ChunksReaderer
+} = Provider{}
+
+type chunkReader struct {
+       r      possum.Reader
+       values []consecutiveValue
+       keys   []int64
+}
+
+func (c chunkReader) ReadAt(p []byte, off int64) (n int, err error) {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (c chunkReader) Close() error {
+       //TODO implement me
+       panic("implement me")
+}
+
+type ChunkReader interface {
+       io.ReaderAt
+       io.Closer
+}
+
+// TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid
+// trying to read discontinuous or incomplete sequences of chunks?
+func (p Provider) ChunksReader(prefix string) (ret storage.PieceReader, err error) {
+       p.Logger.Levelf(log.Debug, "ChunkReader(%q)", prefix)
+       //debug.PrintStack()
+       pr, err := p.Handle.NewReader()
+       if err != nil {
+               return
+       }
+       defer func() {
+               if err != nil {
+                       pr.End()
+               }
+       }()
+       items, err := pr.ListItems(prefix)
+       if err != nil {
+               return
+       }
+       keys := make([]int64, 0, len(items))
+       for _, item := range items {
+               var i int64
+               offsetStr := item.Key
+               i, err = strconv.ParseInt(offsetStr, 10, 64)
+               if err != nil {
+                       err = fmt.Errorf("failed to parse offset %q: %w", offsetStr, err)
+                       return
+               }
+               keys = append(keys, i)
+       }
+       sort.Sort(keySorter[possum.Item, int64]{items, keys})
+       offset := int64(0)
+       consValues := make([]consecutiveValue, 0, len(items))
+       for i, item := range items {
+               itemOffset := keys[i]
+               if itemOffset+item.Stat.Size() <= offset {
+                       // This item isn't needed
+                       continue
+               }
+               var v possum.Value
+               v, err = pr.Add(prefix + item.Key)
+               if err != nil {
+                       return
+               }
+               consValues = append(consValues, consecutiveValue{
+                       pv:     v,
+                       offset: itemOffset,
+                       size:   item.Stat.Size(),
+               })
+               offset = itemOffset + item.Stat.Size()
+       }
+       err = pr.Begin()
+       if err != nil {
+               return
+       }
+       ret = chunkReader{
+               r:      pr,
+               values: consValues,
+       }
+       return
+}
 
 // TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid
 // trying to read discontinuous or incomplete sequences of chunks?
diff --git a/storage/torrent-reader.go b/storage/torrent-reader.go
new file mode 100644 (file)
index 0000000..c42fdf7
--- /dev/null
@@ -0,0 +1,10 @@
+package storage
+
+import (
+       "io"
+)
+
+type TorrentReader interface {
+       io.ReaderAt
+       io.Closer
+}
diff --git a/t.go b/t.go
index 8a07579f403823de3c59eb99f5f0ed1d70575af0..f862b553b4af30c02065f295b0f062249d544237 100644 (file)
--- a/t.go
+++ b/t.go
@@ -40,11 +40,12 @@ func (t *Torrent) NewReader() Reader {
 
 func (t *Torrent) newReader(offset, length int64) Reader {
        r := reader{
-               mu:     t.cl.locker(),
-               t:      t,
-               offset: offset,
-               length: length,
-               ctx:    context.Background(),
+               mu:            t.cl.locker(),
+               t:             t,
+               offset:        offset,
+               length:        length,
+               storageReader: t.storageReader(),
+               ctx:           context.Background(),
        }
        r.readaheadFunc = defaultReadaheadFunc
        t.addReader(&r)
index 69b2e72f46ed6d2b0c2e118a7835fa33c91d0675..f8254dbddcf375fb531282a6f6c7018a095cb7e3 100644 (file)
@@ -30,6 +30,7 @@ import (
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2"
        "github.com/anacrolix/missinggo/v2/bitmap"
+       "github.com/anacrolix/missinggo/v2/panicif"
        "github.com/anacrolix/missinggo/v2/pubsub"
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
@@ -1546,6 +1547,7 @@ func (t *Torrent) logPieceRequestOrder() {
 
 // Returns the range of pieces [begin, end) that contains the extent of bytes.
 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
+       fmt.Println("byteRegionPieces", off, size)
        if off >= t.length() {
                return
        }
@@ -1687,18 +1689,9 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
 
 // Non-blocking read. Client lock is not required.
 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
-       for len(b) != 0 {
-               p := &t.pieces[off/t.info.PieceLength]
-               p.waitNoPendingWrites()
-               var n1 int
-               n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
-               if n1 == 0 {
-                       break
-               }
-               off += int64(n1)
-               n += n1
-               b = b[n1:]
-       }
+       r := t.storageReader()
+       n, err = r.ReadAt(b, off)
+       panicif.Err(r.Close())
        return
 }
 
index b8d72e986e87f4e2d93f62357c7ef83d6bdf43c1..3e95917ea264fdca4d06e3171633a99a583db970 100644 (file)
@@ -12,6 +12,7 @@ import (
        "time"
 
        "github.com/anacrolix/dht/v2/krpc"
+
        "github.com/anacrolix/torrent/tracker"
 )