]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Finish the storage reader implementation
authorMatt Joiner <anacrolix@gmail.com>
Wed, 7 May 2025 11:27:00 +0000 (21:27 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 7 May 2025 11:27:00 +0000 (21:27 +1000)
reader.go
storage.go
storage/interface.go
storage/piece-resource.go
storage/possum/possum-provider.go
storage/wrappers.go
torrent.go

index 5630c424284f2e20c7f136762de733fc5b4e7af7..0a1c2297965faac3ff1dfe749f68f365d68b31bb 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -276,7 +276,8 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er
        // I think we can get EOF here due to the ReadAt contract. Previously we were forgetting to
        // return an error so it wasn't noticed. We now try again if there's a storage cap otherwise
        // convert it to io.UnexpectedEOF.
-       n, err = r.t.readAt(b1, r.torrentOffset(pos))
+       n, err = r.storageReader.ReadAt(b1, r.torrentOffset(pos))
+       //n, err = r.t.readAt(b1, r.torrentOffset(pos))
        if n != 0 {
                err = nil
                return
index 40afbb871f7992e641d283806993b8f32f928c18..1acfb8efef1bd260790f24ba725970d8f5f2306c 100644 (file)
@@ -10,7 +10,7 @@ import (
 
 func (t *Torrent) storageReader() storageReader {
        if t.storage.NewReader == nil {
-               return storagePieceReader{t: t}
+               return &storagePieceReader{t: t}
        }
        return torrentStorageImplReader{
                implReader: t.storage.NewReader(),
@@ -18,20 +18,44 @@ func (t *Torrent) storageReader() storageReader {
        }
 }
 
+// This wraps per-piece storage as a whole-torrent storageReader.
 type storagePieceReader struct {
-       t *Torrent
+       t       *Torrent
+       pr      storage.PieceReader
+       prIndex int
 }
 
-func (me storagePieceReader) Close() error {
-       return nil
+func (me *storagePieceReader) Close() (err error) {
+       if me.pr != nil {
+               err = me.pr.Close()
+       }
+       return
+}
+
+func (me *storagePieceReader) getReaderAt(p *Piece) (err error) {
+       if me.pr != nil {
+               if me.prIndex == p.index {
+                       return
+               }
+               panicif.Err(me.pr.Close())
+               me.pr = nil
+       }
+       ps := p.Storage()
+       me.prIndex = p.index
+       me.pr, err = ps.NewReader()
+       return
 }
 
-func (me storagePieceReader) ReadAt(b []byte, off int64) (n int, err error) {
+func (me *storagePieceReader) ReadAt(b []byte, off int64) (n int, err error) {
        for len(b) != 0 {
                p := me.t.pieceForOffset(off)
                p.waitNoPendingWrites()
                var n1 int
-               n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
+               err = me.getReaderAt(p)
+               if err != nil {
+                       return
+               }
+               n1, err = me.pr.ReadAt(b, off-p.Info().Offset())
                if n1 == 0 {
                        panicif.Nil(err)
                        break
@@ -48,6 +72,7 @@ type storageReader interface {
        io.Closer
 }
 
+// This wraps a storage impl provided TorrentReader as a storageReader.
 type torrentStorageImplReader struct {
        implReader storage.TorrentReader
        t          *Torrent
index 3ef528b27903517bfd523ff93fc809b26d7c23a0..b25e360d3abbd2319369e5a06cac6b4be916bd39 100644 (file)
@@ -38,13 +38,15 @@ type TorrentImpl struct {
        // place.
        Capacity TorrentCapacity
 
-       NewReader func() TorrentReader
+       NewReader      func() TorrentReader
+       NewPieceReader func(p Piece) PieceReader
 }
 
-// Interacts with torrent piece data. Optional interfaces to implement include:
+// Interacts with torrent piece data. Optional interfaces to implement include://
 //
-//     io.WriterTo, such as when a piece supports a more efficient way to write out incomplete chunks.
-//     SelfHashing, such as when a piece supports a more efficient way to hash its contents.
+//             io.WriterTo, such as when a piece supports a more efficient way to write out incomplete chunks.
+//             SelfHashing, such as when a piece supports a more efficient way to hash its contents.
+//             PieceReaderer when it has a stateful Reader interface that is more efficient.
 type PieceImpl interface {
        // These interfaces are not as strict as normally required. They can
        // assume that the parameters are appropriate for the dimensions of the
@@ -71,14 +73,15 @@ type Completion struct {
        Complete bool
 }
 
-// Allows a storage backend to override hashing (i.e. if it can do it more efficiently than the torrent client can)
+// Allows a storage backend to override hashing (i.e. if it can do it more efficiently than the
+// torrent client can).
 type SelfHashing interface {
        SelfHash() (metainfo.Hash, error)
 }
 
 // Piece supports dedicated reader.
 type PieceReaderer interface {
-       NewReader() PieceReader
+       NewReader() (PieceReader, error)
 }
 
 type PieceReader interface {
index d7538a24c8e89b4296391f9af80b61ec1b8a40e0..8e73b82933a647abd2fee3cfaff14743f440e7fd 100644 (file)
@@ -60,11 +60,12 @@ func (s piecePerResource) OpenTorrent(
                s,
                make([]sync.RWMutex, info.NumPieces()),
        }
-       return TorrentImpl{
+       ret := TorrentImpl{
                PieceWithHash: t.Piece,
                Close:         t.Close,
                Capacity:      s.opts.Capacity,
-       }, nil
+       }
+       return ret, nil
 }
 
 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl {
@@ -106,7 +107,10 @@ type piecePerResourcePiece struct {
        mu *sync.RWMutex
 }
 
-var _ io.WriterTo = piecePerResourcePiece{}
+var _ interface {
+       io.WriterTo
+       PieceReaderer
+} = piecePerResourcePiece{}
 
 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
        s.mu.RLock()
@@ -243,7 +247,7 @@ func (s piecePerResourcePiece) MarkNotComplete() error {
 }
 
 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) {
-       r, err := s.PieceReader()
+       r, err := s.NewReader()
        if err != nil {
                return
        }
@@ -358,6 +362,7 @@ func (s piecePerResourcePiece) completedInstance() resource.Instance {
        return i
 }
 
+// TODO: Add DirPrefix methods that include the "/" because it's easy to forget and always required.
 func (s piecePerResourcePiece) incompleteDirPath() string {
        return path.Join("incompleted", s.hashHex())
 }
@@ -386,16 +391,16 @@ func (s piecePerResourcePiece) getChunksReader(prefix string) (PieceReader, erro
        return chunkPieceReader{s.getChunks(prefix)}, nil
 }
 
-func (s piecePerResourcePiece) PieceReader() (PieceReader, error) {
+func (s piecePerResourcePiece) NewReader() (PieceReader, error) {
        s.mu.RLock()
        defer s.mu.RUnlock()
        if s.mustIsComplete() {
                if s.hasMovePrefix() {
-                       return s.getChunksReader(s.completedDirPath())
+                       return s.getChunksReader(s.completedDirPath() + "/")
                }
                return instancePieceReader{s.completedInstance()}, nil
        }
-       return s.getChunksReader(s.incompleteDirPath())
+       return s.getChunksReader(s.incompleteDirPath() + "/")
 }
 
 type instancePieceReader struct {
index b30949e60e947711203361e49229945809ea315a..d75adb7fc9ee2c496d1aa73d6f93c7e6e2c5f1e5 100644 (file)
@@ -34,13 +34,19 @@ type chunkReader struct {
 }
 
 func (c chunkReader) ReadAt(p []byte, off int64) (n int, err error) {
-       //TODO implement me
-       panic("implement me")
+       vi := sort.Search(len(c.values), func(i int) bool {
+               return off < c.values[i].offset+c.values[i].size
+       })
+       if vi == len(c.values) {
+               err = io.ErrUnexpectedEOF
+               return
+       }
+       v := c.values[vi]
+       return v.pv.ReadAt(p, off-v.offset)
 }
 
 func (c chunkReader) Close() error {
-       //TODO implement me
-       panic("implement me")
+       return c.r.Close()
 }
 
 type ChunkReader interface {
@@ -51,7 +57,7 @@ type ChunkReader interface {
 // TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid
 // trying to read discontinuous or incomplete sequences of chunks?
 func (p Provider) ChunksReader(prefix string) (ret storage.PieceReader, err error) {
-       p.Logger.Levelf(log.Debug, "ChunkReader(%q)", prefix)
+       p.Logger.Levelf(log.Critical, "ChunkReader(%q)", prefix)
        //debug.PrintStack()
        pr, err := p.Handle.NewReader()
        if err != nil {
index 4f239736d2c4b79df28f11e56b848ab64520e48f..5406da6378834b7ce14b5999cf9b86963035bafc 100644 (file)
@@ -36,22 +36,23 @@ type Torrent struct {
 }
 
 // Deprecated. Use PieceWithHash, as this doesn't work with pure v2 torrents.
-func (t Torrent) Piece(p metainfo.Piece) Piece {
+func (t *Torrent) Piece(p metainfo.Piece) Piece {
        return t.PieceWithHash(p, g.Some(p.V1Hash().Unwrap().Bytes()))
 }
 
-func (t Torrent) PieceWithHash(p metainfo.Piece, pieceHash g.Option[[]byte]) Piece {
+func (t *Torrent) PieceWithHash(p metainfo.Piece, pieceHash g.Option[[]byte]) Piece {
        var pieceImpl PieceImpl
        if t.TorrentImpl.PieceWithHash != nil {
                pieceImpl = t.TorrentImpl.PieceWithHash(p, pieceHash)
        } else {
                pieceImpl = t.TorrentImpl.Piece(p)
        }
-       return Piece{pieceImpl, p}
+       return Piece{pieceImpl, t, p}
 }
 
 type Piece struct {
        PieceImpl
+       t   *Torrent
        mip metainfo.Piece
 }
 
@@ -120,3 +121,24 @@ func (p Piece) ReadAt(b []byte, off int64) (n int, err error) {
 
        return
 }
+
+func (p Piece) NewReader() (PieceReader, error) {
+       pr, ok := p.PieceImpl.(PieceReaderer)
+       if ok {
+               return pr.NewReader()
+       }
+       // TODO: Make generic reflect wrapper for nop Closer.
+       return struct {
+               io.ReaderAt
+               io.Closer
+       }{
+               p,
+               nopCloser{},
+       }, nil
+}
+
+type nopCloser struct{}
+
+func (nopCloser) Close() error {
+       return nil
+}
index f8254dbddcf375fb531282a6f6c7018a095cb7e3..d56050878091cc2b5dce95aa0c77b65d6054ff0b 100644 (file)
@@ -1547,7 +1547,6 @@ func (t *Torrent) logPieceRequestOrder() {
 
 // Returns the range of pieces [begin, end) that contains the extent of bytes.
 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
-       fmt.Println("byteRegionPieces", off, size)
        if off >= t.length() {
                return
        }
@@ -1560,8 +1559,8 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
        }
        begin = pieceIndex(off / t.info.PieceLength)
        end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
-       if end > pieceIndex(t.info.NumPieces()) {
-               end = pieceIndex(t.info.NumPieces())
+       if end > t.info.NumPieces() {
+               end = t.info.NumPieces()
        }
        return
 }