From: Matt Joiner Date: Wed, 7 May 2025 11:27:00 +0000 (+1000) Subject: Finish the storage reader implementation X-Git-Tag: v1.59.0~165 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=8c098d1e85d269eebf0e336372989e8f72579c40;p=btrtrc.git Finish the storage reader implementation --- diff --git a/reader.go b/reader.go index 5630c424..0a1c2297 100644 --- a/reader.go +++ b/reader.go @@ -276,7 +276,8 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er // I think we can get EOF here due to the ReadAt contract. Previously we were forgetting to // return an error so it wasn't noticed. We now try again if there's a storage cap otherwise // convert it to io.UnexpectedEOF. - n, err = r.t.readAt(b1, r.torrentOffset(pos)) + n, err = r.storageReader.ReadAt(b1, r.torrentOffset(pos)) + //n, err = r.t.readAt(b1, r.torrentOffset(pos)) if n != 0 { err = nil return diff --git a/storage.go b/storage.go index 40afbb87..1acfb8ef 100644 --- a/storage.go +++ b/storage.go @@ -10,7 +10,7 @@ import ( func (t *Torrent) storageReader() storageReader { if t.storage.NewReader == nil { - return storagePieceReader{t: t} + return &storagePieceReader{t: t} } return torrentStorageImplReader{ implReader: t.storage.NewReader(), @@ -18,20 +18,44 @@ func (t *Torrent) storageReader() storageReader { } } +// This wraps per-piece storage as a whole-torrent storageReader. type storagePieceReader struct { - t *Torrent + t *Torrent + pr storage.PieceReader + prIndex int } -func (me storagePieceReader) Close() error { - return nil +func (me *storagePieceReader) Close() (err error) { + if me.pr != nil { + err = me.pr.Close() + } + return +} + +func (me *storagePieceReader) getReaderAt(p *Piece) (err error) { + if me.pr != nil { + if me.prIndex == p.index { + return + } + panicif.Err(me.pr.Close()) + me.pr = nil + } + ps := p.Storage() + me.prIndex = p.index + me.pr, err = ps.NewReader() + return } -func (me storagePieceReader) ReadAt(b []byte, off int64) (n int, err error) { +func (me *storagePieceReader) ReadAt(b []byte, off int64) (n int, err error) { for len(b) != 0 { p := me.t.pieceForOffset(off) p.waitNoPendingWrites() var n1 int - n1, err = p.Storage().ReadAt(b, off-p.Info().Offset()) + err = me.getReaderAt(p) + if err != nil { + return + } + n1, err = me.pr.ReadAt(b, off-p.Info().Offset()) if n1 == 0 { panicif.Nil(err) break @@ -48,6 +72,7 @@ type storageReader interface { io.Closer } +// This wraps a storage impl provided TorrentReader as a storageReader. type torrentStorageImplReader struct { implReader storage.TorrentReader t *Torrent diff --git a/storage/interface.go b/storage/interface.go index 3ef528b2..b25e360d 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -38,13 +38,15 @@ type TorrentImpl struct { // place. Capacity TorrentCapacity - NewReader func() TorrentReader + NewReader func() TorrentReader + NewPieceReader func(p Piece) PieceReader } -// Interacts with torrent piece data. Optional interfaces to implement include: +// Interacts with torrent piece data. Optional interfaces to implement include:// // -// io.WriterTo, such as when a piece supports a more efficient way to write out incomplete chunks. -// SelfHashing, such as when a piece supports a more efficient way to hash its contents. +// io.WriterTo, such as when a piece supports a more efficient way to write out incomplete chunks. +// SelfHashing, such as when a piece supports a more efficient way to hash its contents. +// PieceReaderer when it has a stateful Reader interface that is more efficient. type PieceImpl interface { // These interfaces are not as strict as normally required. They can // assume that the parameters are appropriate for the dimensions of the @@ -71,14 +73,15 @@ type Completion struct { Complete bool } -// Allows a storage backend to override hashing (i.e. if it can do it more efficiently than the torrent client can) +// Allows a storage backend to override hashing (i.e. if it can do it more efficiently than the +// torrent client can). type SelfHashing interface { SelfHash() (metainfo.Hash, error) } // Piece supports dedicated reader. type PieceReaderer interface { - NewReader() PieceReader + NewReader() (PieceReader, error) } type PieceReader interface { diff --git a/storage/piece-resource.go b/storage/piece-resource.go index d7538a24..8e73b829 100644 --- a/storage/piece-resource.go +++ b/storage/piece-resource.go @@ -60,11 +60,12 @@ func (s piecePerResource) OpenTorrent( s, make([]sync.RWMutex, info.NumPieces()), } - return TorrentImpl{ + ret := TorrentImpl{ PieceWithHash: t.Piece, Close: t.Close, Capacity: s.opts.Capacity, - }, nil + } + return ret, nil } func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl { @@ -106,7 +107,10 @@ type piecePerResourcePiece struct { mu *sync.RWMutex } -var _ io.WriterTo = piecePerResourcePiece{} +var _ interface { + io.WriterTo + PieceReaderer +} = piecePerResourcePiece{} func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) { s.mu.RLock() @@ -243,7 +247,7 @@ func (s piecePerResourcePiece) MarkNotComplete() error { } func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) { - r, err := s.PieceReader() + r, err := s.NewReader() if err != nil { return } @@ -358,6 +362,7 @@ func (s piecePerResourcePiece) completedInstance() resource.Instance { return i } +// TODO: Add DirPrefix methods that include the "/" because it's easy to forget and always required. func (s piecePerResourcePiece) incompleteDirPath() string { return path.Join("incompleted", s.hashHex()) } @@ -386,16 +391,16 @@ func (s piecePerResourcePiece) getChunksReader(prefix string) (PieceReader, erro return chunkPieceReader{s.getChunks(prefix)}, nil } -func (s piecePerResourcePiece) PieceReader() (PieceReader, error) { +func (s piecePerResourcePiece) NewReader() (PieceReader, error) { s.mu.RLock() defer s.mu.RUnlock() if s.mustIsComplete() { if s.hasMovePrefix() { - return s.getChunksReader(s.completedDirPath()) + return s.getChunksReader(s.completedDirPath() + "/") } return instancePieceReader{s.completedInstance()}, nil } - return s.getChunksReader(s.incompleteDirPath()) + return s.getChunksReader(s.incompleteDirPath() + "/") } type instancePieceReader struct { diff --git a/storage/possum/possum-provider.go b/storage/possum/possum-provider.go index b30949e6..d75adb7f 100644 --- a/storage/possum/possum-provider.go +++ b/storage/possum/possum-provider.go @@ -34,13 +34,19 @@ type chunkReader struct { } func (c chunkReader) ReadAt(p []byte, off int64) (n int, err error) { - //TODO implement me - panic("implement me") + vi := sort.Search(len(c.values), func(i int) bool { + return off < c.values[i].offset+c.values[i].size + }) + if vi == len(c.values) { + err = io.ErrUnexpectedEOF + return + } + v := c.values[vi] + return v.pv.ReadAt(p, off-v.offset) } func (c chunkReader) Close() error { - //TODO implement me - panic("implement me") + return c.r.Close() } type ChunkReader interface { @@ -51,7 +57,7 @@ type ChunkReader interface { // TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid // trying to read discontinuous or incomplete sequences of chunks? func (p Provider) ChunksReader(prefix string) (ret storage.PieceReader, err error) { - p.Logger.Levelf(log.Debug, "ChunkReader(%q)", prefix) + p.Logger.Levelf(log.Critical, "ChunkReader(%q)", prefix) //debug.PrintStack() pr, err := p.Handle.NewReader() if err != nil { diff --git a/storage/wrappers.go b/storage/wrappers.go index 4f239736..5406da63 100644 --- a/storage/wrappers.go +++ b/storage/wrappers.go @@ -36,22 +36,23 @@ type Torrent struct { } // Deprecated. Use PieceWithHash, as this doesn't work with pure v2 torrents. -func (t Torrent) Piece(p metainfo.Piece) Piece { +func (t *Torrent) Piece(p metainfo.Piece) Piece { return t.PieceWithHash(p, g.Some(p.V1Hash().Unwrap().Bytes())) } -func (t Torrent) PieceWithHash(p metainfo.Piece, pieceHash g.Option[[]byte]) Piece { +func (t *Torrent) PieceWithHash(p metainfo.Piece, pieceHash g.Option[[]byte]) Piece { var pieceImpl PieceImpl if t.TorrentImpl.PieceWithHash != nil { pieceImpl = t.TorrentImpl.PieceWithHash(p, pieceHash) } else { pieceImpl = t.TorrentImpl.Piece(p) } - return Piece{pieceImpl, p} + return Piece{pieceImpl, t, p} } type Piece struct { PieceImpl + t *Torrent mip metainfo.Piece } @@ -120,3 +121,24 @@ func (p Piece) ReadAt(b []byte, off int64) (n int, err error) { return } + +func (p Piece) NewReader() (PieceReader, error) { + pr, ok := p.PieceImpl.(PieceReaderer) + if ok { + return pr.NewReader() + } + // TODO: Make generic reflect wrapper for nop Closer. + return struct { + io.ReaderAt + io.Closer + }{ + p, + nopCloser{}, + }, nil +} + +type nopCloser struct{} + +func (nopCloser) Close() error { + return nil +} diff --git a/torrent.go b/torrent.go index f8254dbd..d5605087 100644 --- a/torrent.go +++ b/torrent.go @@ -1547,7 +1547,6 @@ func (t *Torrent) logPieceRequestOrder() { // Returns the range of pieces [begin, end) that contains the extent of bytes. func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) { - fmt.Println("byteRegionPieces", off, size) if off >= t.length() { return } @@ -1560,8 +1559,8 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) { } begin = pieceIndex(off / t.info.PieceLength) end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength) - if end > pieceIndex(t.info.NumPieces()) { - end = pieceIndex(t.info.NumPieces()) + if end > t.info.NumPieces() { + end = t.info.NumPieces() } return }