// Adjust the read/seek window to handle Readers locked to File extents and the like.
offset, length int64
+ storageReader storageReader
+
// Function to dynamically calculate readahead. If nil, readahead is static.
readaheadFunc ReadaheadFunc
r.t.cl.lock()
r.t.deleteReader(r)
r.t.cl.unlock()
- return nil
+ return r.storageReader.Close()
}
func (r *reader) posChanged() {
--- /dev/null
+package torrent
+
+import (
+ "io"
+
+ "github.com/anacrolix/missinggo/v2/panicif"
+
+ "github.com/anacrolix/torrent/storage"
+)
+
+func (t *Torrent) storageReader() storageReader {
+ if t.storage.NewReader == nil {
+ return storagePieceReader{t: t}
+ }
+ return torrentStorageImplReader{
+ implReader: t.storage.NewReader(),
+ t: t,
+ }
+}
+
+type storagePieceReader struct {
+ t *Torrent
+}
+
+func (me storagePieceReader) Close() error {
+ return nil
+}
+
+func (me storagePieceReader) ReadAt(b []byte, off int64) (n int, err error) {
+ for len(b) != 0 {
+ p := me.t.pieceForOffset(off)
+ p.waitNoPendingWrites()
+ var n1 int
+ n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
+ if n1 == 0 {
+ panicif.Nil(err)
+ break
+ }
+ off += int64(n1)
+ n += n1
+ b = b[n1:]
+ }
+ return
+}
+
+type storageReader interface {
+ io.ReaderAt
+ io.Closer
+}
+
+type torrentStorageImplReader struct {
+ implReader storage.TorrentReader
+ t *Torrent
+}
+
+func (me torrentStorageImplReader) ReadAt(p []byte, off int64) (n int, err error) {
+ // TODO: Should waitNoPendingWrites take a region?
+ me.t.pieceForOffset(off).waitNoPendingWrites()
+ return me.implReader.ReadAt(p, off)
+}
+
+func (me torrentStorageImplReader) Close() error {
+ return me.implReader.Close()
+}
// to determine the storage for torrents sharing the same function pointer, and mutated in
// place.
Capacity TorrentCapacity
+
+ NewReader func() TorrentReader
}
// Interacts with torrent piece data. Optional interfaces to implement include:
type SelfHashing interface {
SelfHash() (metainfo.Hash, error)
}
+
+// Piece supports dedicated reader.
+type PieceReaderer interface {
+ NewReader() PieceReader
+}
+
+type PieceReader interface {
+ io.ReaderAt
+ io.Closer
+}
ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
}
+type ChunksReaderer interface {
+ ChunksReader(prefix string) (PieceReader, error)
+}
+
type PrefixDeleter interface {
DeletePrefix(prefix string) error
}
return s.completedInstance().Delete()
}
-func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- if s.mustIsComplete() {
- if s.hasMovePrefix() {
- chunks := s.getChunks(s.completedDirPath())
- return chunks.ReadAt(b, off)
- }
- return s.completedInstance().ReadAt(b, off)
+func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) {
+ r, err := s.PieceReader()
+ if err != nil {
+ return
}
- return s.getChunks(s.incompleteDirPath()).ReadAt(b, off)
+ defer r.Close()
+ n, err = r.ReadAt(b, off)
+ return
}
func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
_, ok := me.rp.(MovePrefixer)
return ok
}
+
+func (s piecePerResourcePiece) getChunksReader(prefix string) (PieceReader, error) {
+ if opt, ok := s.rp.(ChunksReaderer); ok {
+ return opt.ChunksReader(prefix)
+ }
+ return chunkPieceReader{s.getChunks(prefix)}, nil
+}
+
+func (s piecePerResourcePiece) PieceReader() (PieceReader, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if s.mustIsComplete() {
+ if s.hasMovePrefix() {
+ return s.getChunksReader(s.completedDirPath())
+ }
+ return instancePieceReader{s.completedInstance()}, nil
+ }
+ return s.getChunksReader(s.incompleteDirPath())
+}
+
+type instancePieceReader struct {
+ resource.Instance
+}
+
+func (instancePieceReader) Close() error {
+ return nil
+}
+
+type chunkPieceReader struct {
+ chunks
+}
+
+func (chunkPieceReader) Close() error {
+ return nil
+}
+
+// TODO: Make an embedded Closer using reflect?
)
// Extends possum resource.Provider with an efficient implementation of torrent
-// storage.ConsecutiveChunkReader. TODO: This doesn't expose Capacity. TODO: Add a MarkComplete
-// method that renames incomplete chunks rather than writing them to a single giant key and deleting
-// them.
+// storage.ConsecutiveChunkReader. TODO: This doesn't expose Capacity.
type Provider struct {
possumResource.Provider
Logger log.Logger
}
-var _ storage.ConsecutiveChunkReader = Provider{}
+var _ interface {
+ storage.ConsecutiveChunkReader
+ storage.ChunksReaderer
+} = Provider{}
+
+type chunkReader struct {
+ r possum.Reader
+ values []consecutiveValue
+ keys []int64
+}
+
+func (c chunkReader) ReadAt(p []byte, off int64) (n int, err error) {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (c chunkReader) Close() error {
+ //TODO implement me
+ panic("implement me")
+}
+
+type ChunkReader interface {
+ io.ReaderAt
+ io.Closer
+}
+
+// TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid
+// trying to read discontinuous or incomplete sequences of chunks?
+func (p Provider) ChunksReader(prefix string) (ret storage.PieceReader, err error) {
+ p.Logger.Levelf(log.Debug, "ChunkReader(%q)", prefix)
+ //debug.PrintStack()
+ pr, err := p.Handle.NewReader()
+ if err != nil {
+ return
+ }
+ defer func() {
+ if err != nil {
+ pr.End()
+ }
+ }()
+ items, err := pr.ListItems(prefix)
+ if err != nil {
+ return
+ }
+ keys := make([]int64, 0, len(items))
+ for _, item := range items {
+ var i int64
+ offsetStr := item.Key
+ i, err = strconv.ParseInt(offsetStr, 10, 64)
+ if err != nil {
+ err = fmt.Errorf("failed to parse offset %q: %w", offsetStr, err)
+ return
+ }
+ keys = append(keys, i)
+ }
+ sort.Sort(keySorter[possum.Item, int64]{items, keys})
+ offset := int64(0)
+ consValues := make([]consecutiveValue, 0, len(items))
+ for i, item := range items {
+ itemOffset := keys[i]
+ if itemOffset+item.Stat.Size() <= offset {
+ // This item isn't needed
+ continue
+ }
+ var v possum.Value
+ v, err = pr.Add(prefix + item.Key)
+ if err != nil {
+ return
+ }
+ consValues = append(consValues, consecutiveValue{
+ pv: v,
+ offset: itemOffset,
+ size: item.Stat.Size(),
+ })
+ offset = itemOffset + item.Stat.Size()
+ }
+ err = pr.Begin()
+ if err != nil {
+ return
+ }
+ ret = chunkReader{
+ r: pr,
+ values: consValues,
+ }
+ return
+}
// TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid
// trying to read discontinuous or incomplete sequences of chunks?
--- /dev/null
+package storage
+
+import (
+ "io"
+)
+
+type TorrentReader interface {
+ io.ReaderAt
+ io.Closer
+}
func (t *Torrent) newReader(offset, length int64) Reader {
r := reader{
- mu: t.cl.locker(),
- t: t,
- offset: offset,
- length: length,
- ctx: context.Background(),
+ mu: t.cl.locker(),
+ t: t,
+ offset: offset,
+ length: length,
+ storageReader: t.storageReader(),
+ ctx: context.Background(),
}
r.readaheadFunc = defaultReadaheadFunc
t.addReader(&r)
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
+ "github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/missinggo/v2/pubsub"
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
// Returns the range of pieces [begin, end) that contains the extent of bytes.
func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
+ fmt.Println("byteRegionPieces", off, size)
if off >= t.length() {
return
}
// Non-blocking read. Client lock is not required.
func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
- for len(b) != 0 {
- p := &t.pieces[off/t.info.PieceLength]
- p.waitNoPendingWrites()
- var n1 int
- n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
- if n1 == 0 {
- break
- }
- off += int64(n1)
- n += n1
- b = b[n1:]
- }
+ r := t.storageReader()
+ n, err = r.ReadAt(b, off)
+ panicif.Err(r.Close())
return
}
"time"
"github.com/anacrolix/dht/v2/krpc"
+
"github.com/anacrolix/torrent/tracker"
)