// I think we can get EOF here due to the ReadAt contract. Previously we were forgetting to
// return an error so it wasn't noticed. We now try again if there's a storage cap otherwise
// convert it to io.UnexpectedEOF.
- n, err = r.t.readAt(b1, r.torrentOffset(pos))
+ n, err = r.storageReader.ReadAt(b1, r.torrentOffset(pos))
+ //n, err = r.t.readAt(b1, r.torrentOffset(pos))
if n != 0 {
err = nil
return
func (t *Torrent) storageReader() storageReader {
if t.storage.NewReader == nil {
- return storagePieceReader{t: t}
+ return &storagePieceReader{t: t}
}
return torrentStorageImplReader{
implReader: t.storage.NewReader(),
}
}
+// This wraps per-piece storage as a whole-torrent storageReader.
type storagePieceReader struct {
- t *Torrent
+ t *Torrent
+ pr storage.PieceReader
+ prIndex int
}
-func (me storagePieceReader) Close() error {
- return nil
+func (me *storagePieceReader) Close() (err error) {
+ if me.pr != nil {
+ err = me.pr.Close()
+ }
+ return
+}
+
+func (me *storagePieceReader) getReaderAt(p *Piece) (err error) {
+ if me.pr != nil {
+ if me.prIndex == p.index {
+ return
+ }
+ panicif.Err(me.pr.Close())
+ me.pr = nil
+ }
+ ps := p.Storage()
+ me.prIndex = p.index
+ me.pr, err = ps.NewReader()
+ return
}
-func (me storagePieceReader) ReadAt(b []byte, off int64) (n int, err error) {
+func (me *storagePieceReader) ReadAt(b []byte, off int64) (n int, err error) {
for len(b) != 0 {
p := me.t.pieceForOffset(off)
p.waitNoPendingWrites()
var n1 int
- n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
+ err = me.getReaderAt(p)
+ if err != nil {
+ return
+ }
+ n1, err = me.pr.ReadAt(b, off-p.Info().Offset())
if n1 == 0 {
panicif.Nil(err)
break
io.Closer
}
+// This wraps a storage impl provided TorrentReader as a storageReader.
type torrentStorageImplReader struct {
implReader storage.TorrentReader
t *Torrent
// place.
Capacity TorrentCapacity
- NewReader func() TorrentReader
+ NewReader func() TorrentReader
+ NewPieceReader func(p Piece) PieceReader
}
-// Interacts with torrent piece data. Optional interfaces to implement include:
+// Interacts with torrent piece data. Optional interfaces to implement include://
//
-// io.WriterTo, such as when a piece supports a more efficient way to write out incomplete chunks.
-// SelfHashing, such as when a piece supports a more efficient way to hash its contents.
+// io.WriterTo, such as when a piece supports a more efficient way to write out incomplete chunks.
+// SelfHashing, such as when a piece supports a more efficient way to hash its contents.
+// PieceReaderer when it has a stateful Reader interface that is more efficient.
type PieceImpl interface {
// These interfaces are not as strict as normally required. They can
// assume that the parameters are appropriate for the dimensions of the
Complete bool
}
-// Allows a storage backend to override hashing (i.e. if it can do it more efficiently than the torrent client can)
+// Allows a storage backend to override hashing (i.e. if it can do it more efficiently than the
+// torrent client can).
type SelfHashing interface {
SelfHash() (metainfo.Hash, error)
}
// Piece supports dedicated reader.
type PieceReaderer interface {
- NewReader() PieceReader
+ NewReader() (PieceReader, error)
}
type PieceReader interface {
s,
make([]sync.RWMutex, info.NumPieces()),
}
- return TorrentImpl{
+ ret := TorrentImpl{
PieceWithHash: t.Piece,
Close: t.Close,
Capacity: s.opts.Capacity,
- }, nil
+ }
+ return ret, nil
}
func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl {
mu *sync.RWMutex
}
-var _ io.WriterTo = piecePerResourcePiece{}
+var _ interface {
+ io.WriterTo
+ PieceReaderer
+} = piecePerResourcePiece{}
func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
s.mu.RLock()
}
func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) {
- r, err := s.PieceReader()
+ r, err := s.NewReader()
if err != nil {
return
}
return i
}
+// TODO: Add DirPrefix methods that include the "/" because it's easy to forget and always required.
func (s piecePerResourcePiece) incompleteDirPath() string {
return path.Join("incompleted", s.hashHex())
}
return chunkPieceReader{s.getChunks(prefix)}, nil
}
-func (s piecePerResourcePiece) PieceReader() (PieceReader, error) {
+func (s piecePerResourcePiece) NewReader() (PieceReader, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.mustIsComplete() {
if s.hasMovePrefix() {
- return s.getChunksReader(s.completedDirPath())
+ return s.getChunksReader(s.completedDirPath() + "/")
}
return instancePieceReader{s.completedInstance()}, nil
}
- return s.getChunksReader(s.incompleteDirPath())
+ return s.getChunksReader(s.incompleteDirPath() + "/")
}
type instancePieceReader struct {
}
func (c chunkReader) ReadAt(p []byte, off int64) (n int, err error) {
- //TODO implement me
- panic("implement me")
+ vi := sort.Search(len(c.values), func(i int) bool {
+ return off < c.values[i].offset+c.values[i].size
+ })
+ if vi == len(c.values) {
+ err = io.ErrUnexpectedEOF
+ return
+ }
+ v := c.values[vi]
+ return v.pv.ReadAt(p, off-v.offset)
}
func (c chunkReader) Close() error {
- //TODO implement me
- panic("implement me")
+ return c.r.Close()
}
type ChunkReader interface {
// TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid
// trying to read discontinuous or incomplete sequences of chunks?
func (p Provider) ChunksReader(prefix string) (ret storage.PieceReader, err error) {
- p.Logger.Levelf(log.Debug, "ChunkReader(%q)", prefix)
+ p.Logger.Levelf(log.Critical, "ChunkReader(%q)", prefix)
//debug.PrintStack()
pr, err := p.Handle.NewReader()
if err != nil {
}
// Deprecated. Use PieceWithHash, as this doesn't work with pure v2 torrents.
-func (t Torrent) Piece(p metainfo.Piece) Piece {
+func (t *Torrent) Piece(p metainfo.Piece) Piece {
return t.PieceWithHash(p, g.Some(p.V1Hash().Unwrap().Bytes()))
}
-func (t Torrent) PieceWithHash(p metainfo.Piece, pieceHash g.Option[[]byte]) Piece {
+func (t *Torrent) PieceWithHash(p metainfo.Piece, pieceHash g.Option[[]byte]) Piece {
var pieceImpl PieceImpl
if t.TorrentImpl.PieceWithHash != nil {
pieceImpl = t.TorrentImpl.PieceWithHash(p, pieceHash)
} else {
pieceImpl = t.TorrentImpl.Piece(p)
}
- return Piece{pieceImpl, p}
+ return Piece{pieceImpl, t, p}
}
type Piece struct {
PieceImpl
+ t *Torrent
mip metainfo.Piece
}
return
}
+
+func (p Piece) NewReader() (PieceReader, error) {
+ pr, ok := p.PieceImpl.(PieceReaderer)
+ if ok {
+ return pr.NewReader()
+ }
+ // TODO: Make generic reflect wrapper for nop Closer.
+ return struct {
+ io.ReaderAt
+ io.Closer
+ }{
+ p,
+ nopCloser{},
+ }, nil
+}
+
+type nopCloser struct{}
+
+func (nopCloser) Close() error {
+ return nil
+}
// Returns the range of pieces [begin, end) that contains the extent of bytes.
func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
- fmt.Println("byteRegionPieces", off, size)
if off >= t.length() {
return
}
}
begin = pieceIndex(off / t.info.PieceLength)
end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
- if end > pieceIndex(t.info.NumPieces()) {
- end = pieceIndex(t.info.NumPieces())
+ if end > t.info.NumPieces() {
+ end = t.info.NumPieces()
}
return
}