From 0b553b296f535d8a1dad872985551503e846ef69 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 21 Jan 2018 22:49:12 +1100 Subject: [PATCH] Add File priorities Fixes #220. --- cmd/torrent-pick/main.go | 1 + file.go | 44 +++++++++++++++++++++--------- fs/torrentfs.go | 4 +-- issue97_test.go | 2 ++ peer_protocol/protocol.go | 2 +- piece.go | 11 +++++++- t.go | 24 +++++++++-------- torrent.go | 57 ++++++++++++++++++++++++++++----------- 8 files changed, 102 insertions(+), 43 deletions(-) diff --git a/cmd/torrent-pick/main.go b/cmd/torrent-pick/main.go index f5a506d2..0fb6971d 100644 --- a/cmd/torrent-pick/main.go +++ b/cmd/torrent-pick/main.go @@ -162,6 +162,7 @@ func main() { if file.DisplayPath() != rootGroup.Pick { continue } + file.Download() srcReader := file.NewReader() defer srcReader.Close() io.Copy(dstWriter, srcReader) diff --git a/file.go b/file.go index 9804b104..70df07b4 100644 --- a/file.go +++ b/file.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/anacrolix/torrent/metainfo" + pwp "github.com/anacrolix/torrent/peer_protocol" ) // Provides access to regions of torrent data that correspond to its files. @@ -13,6 +14,7 @@ type File struct { offset int64 length int64 fi metainfo.FileInfo + prio piecePriority } func (f *File) Torrent() *Torrent { @@ -81,18 +83,7 @@ func (f *File) State() (ret []FilePieceState) { // Requests that all pieces containing data in the file be downloaded. func (f *File) Download() { - f.t.DownloadPieces(f.t.byteRegionPieces(f.offset, f.length)) -} - -// Deprecated: Use File.DownloadRegion. -func (f *File) PrioritizeRegion(off, len int64) { - f.DownloadRegion(off, len) -} - -// Requests that torrent pieces containing bytes in the given region of the -// file be downloaded. -func (f *File) DownloadRegion(off, len int64) { - f.t.DownloadPieces(f.t.byteRegionPieces(f.offset+off, len)) + f.SetPriority(PiecePriorityNormal) } func byteRegionExclusivePieces(off, size, pieceSize int64) (begin, end int) { @@ -105,8 +96,9 @@ func (f *File) exclusivePieces() (begin, end int) { return byteRegionExclusivePieces(f.offset, f.length, int64(f.t.usualPieceSize())) } +// Deprecated: Use File.SetPriority. func (f *File) Cancel() { - f.t.CancelPieces(f.exclusivePieces()) + f.SetPriority(PiecePriorityNone) } func (f *File) NewReader() Reader { @@ -120,3 +112,29 @@ func (f *File) NewReader() Reader { f.t.addReader(&tr) return &tr } + +// Sets the minimum priority for pieces in the File. +func (f *File) SetPriority(prio piecePriority) { + f.t.cl.mu.Lock() + defer f.t.cl.mu.Unlock() + if prio == f.prio { + return + } + f.prio = prio + f.t.updatePiecePriorities(f.firstPieceIndex().Int(), f.lastPieceIndex().Int()+1) +} + +// Returns the priority per File.SetPriority. +func (f *File) Priority() piecePriority { + f.t.cl.mu.Lock() + defer f.t.cl.mu.Unlock() + return f.prio +} + +func (f *File) firstPieceIndex() pwp.Integer { + return pwp.Integer(f.offset / int64(f.t.usualPieceSize())) +} + +func (f *File) lastPieceIndex() pwp.Integer { + return pwp.Integer((f.offset + f.length) / int64(f.t.usualPieceSize())) +} diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 5fdc95e8..52b5ff59 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -106,7 +106,7 @@ func (dn dirNode) Lookup(_ context.Context, name string) (fusefs.Node, error) { fullPath := dn.path + "/" + name for _, f := range dn.t.Files() { if f.DisplayPath() == fullPath { - file = &f + file = f } if isSubPath(fullPath, f.DisplayPath()) { dir = true @@ -143,7 +143,7 @@ func (rn rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, t: t, } if !info.IsDir() { - _node = fileNode{__node, &t.Files()[0]} + _node = fileNode{__node, t.Files()[0]} } else { _node = dirNode{__node} } diff --git a/issue97_test.go b/issue97_test.go index 5e1ab8f9..7dc7a322 100644 --- a/issue97_test.go +++ b/issue97_test.go @@ -21,6 +21,8 @@ func TestHashPieceAfterStorageClosed(t *testing.T) { info, err := mi.UnmarshalInfo() require.NoError(t, err) tt.info = &info + tt.cacheLength(&info) + tt.initFiles() tt.makePieces() tt.storage, err = cs.OpenTorrent(tt.info, mi.HashInfoBytes()) require.NoError(t, err) diff --git a/peer_protocol/protocol.go b/peer_protocol/protocol.go index 0dc5784c..bc4d6ee6 100644 --- a/peer_protocol/protocol.go +++ b/peer_protocol/protocol.go @@ -20,7 +20,7 @@ func (i *Integer) Read(r io.Reader) error { return binary.Read(r, binary.BigEndian, i) } -// It's perfectly fine to cast these to an int. +// It's perfectly fine to cast these to an int. TODO: Or is it? func (i Integer) Int() int { return int(i) } diff --git a/piece.go b/piece.go index 71c60417..c5d7f134 100644 --- a/piece.go +++ b/piece.go @@ -27,7 +27,7 @@ func (me piecePriority) BitmapPriority() int { } const ( - PiecePriorityNone piecePriority = iota // Not wanted. + PiecePriorityNone piecePriority = iota // Not wanted. Must be the zero value. PiecePriorityNormal // Wanted. PiecePriorityHigh // Wanted a lot. PiecePriorityReadahead // May be required soon. @@ -42,6 +42,7 @@ type Piece struct { hash metainfo.Hash t *Torrent index int + files []*File // Chunks we've written to since the last check. The chunk offset and // length can be determined by the request chunkSize in use. dirtyChunks bitmap.Bitmap @@ -192,3 +193,11 @@ func (p *Piece) VerifyData() { func (p *Piece) queuedForHash() bool { return p.t.piecesQueuedForHash.Get(p.index) } + +func (p *Piece) torrentBeginOffset() int64 { + return int64(p.index) * p.t.info.PieceLength +} + +func (p *Piece) torrentEndOffset() int64 { + return p.torrentBeginOffset() + int64(p.length()) +} diff --git a/t.go b/t.go index 93d8c3bf..4d288b7a 100644 --- a/t.go +++ b/t.go @@ -158,25 +158,27 @@ func (t *Torrent) CancelPieces(begin, end int) { t.unpendPieceRange(begin, end) } -// Returns handles to the files in the torrent. This requires the metainfo is -// available first. -func (t *Torrent) Files() (ret []File) { - info := t.Info() - if info == nil { - return - } +func (t *Torrent) initFiles() { var offset int64 - for _, fi := range info.UpvertedFiles() { - ret = append(ret, File{ + t.files = new([]*File) + for _, fi := range t.info.UpvertedFiles() { + *t.files = append(*t.files, &File{ t, - strings.Join(append([]string{info.Name}, fi.Path...), "/"), + strings.Join(append([]string{t.info.Name}, fi.Path...), "/"), offset, fi.Length, fi, + PiecePriorityNone, }) offset += fi.Length } - return + +} + +// Returns handles to the files in the torrent. This requires that the Info is +// available first. +func (t *Torrent) Files() []*File { + return *t.files } func (t *Torrent) AddPeers(pp []Peer) { diff --git a/torrent.go b/torrent.go index 113d666c..981f5b85 100644 --- a/torrent.go +++ b/torrent.go @@ -16,12 +16,11 @@ import ( "text/tabwriter" "time" - "github.com/anacrolix/missinggo/prioritybitmap" - "github.com/anacrolix/dht" "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/perf" + "github.com/anacrolix/missinggo/prioritybitmap" "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" "github.com/bradfitz/iter" @@ -76,7 +75,8 @@ type Torrent struct { metainfo metainfo.MetaInfo // The info dict. nil if we don't have it (yet). - info *metainfo.Info + info *metainfo.Info + files *[]*File // Active peer connections, running message stream loops. conns map[*connection]struct{} @@ -305,7 +305,34 @@ func (t *Torrent) makePieces() { piece.index = i piece.noPendingWrites.L = &piece.pendingWritesMutex missinggo.CopyExact(piece.hash[:], hash) + piece.files = (*t.files)[pieceFirstFileIndex(piece.torrentBeginOffset(), *t.files):pieceLastFileIndex(piece.torrentEndOffset(), *t.files)] + } +} + +func pieceFirstFileIndex(pieceOffset int64, files []*File) int { + for i, f := range files { + if f.offset+f.length > pieceOffset { + return i + } } + return -1 +} + +func pieceLastFileIndex(pieceEndOffset int64, files []*File) int { + for i, f := range files { + if f.offset+f.length >= pieceEndOffset { + return i + } + } + return -1 +} + +func (t *Torrent) cacheLength(info *metainfo.Info) { + var l int64 + for _, f := range t.info.UpvertedFiles() { + l += f.Length + } + t.length = &l } // Called when metadata for a torrent becomes available. @@ -327,6 +354,7 @@ func (t *Torrent) setInfoBytes(b []byte) error { } defer t.updateWantPeersEvent() t.info = &info + t.initFiles() t.displayName = "" // Save a few bytes lol. t.cl.event.Broadcast() t.gotMetainfo.Set() @@ -334,11 +362,7 @@ func (t *Torrent) setInfoBytes(b []byte) error { if err != nil { return fmt.Errorf("error opening torrent storage: %s", err) } - var l int64 - for _, f := range t.info.UpvertedFiles() { - l += f.Length - } - t.length = &l + t.cacheLength(&info) t.metadataBytes = b t.metadataCompletedChunks = nil t.makePieces() @@ -945,23 +969,26 @@ func (t *Torrent) piecePriority(piece int) piecePriority { return t.pieces[piece].priority } -func (t *Torrent) piecePriorityUncached(piece int) piecePriority { - if t.pieceComplete(piece) { - return PiecePriorityNone +func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) { + for _, f := range t.pieces[piece].files { + ret.Raise(f.prio) } if t.readerNowPieces.Contains(piece) { - return PiecePriorityNow + ret.Raise(PiecePriorityNow) } // if t.readerNowPieces.Contains(piece - 1) { // return PiecePriorityNext // } if t.readerReadaheadPieces.Contains(piece) { - return PiecePriorityReadahead + ret.Raise(PiecePriorityReadahead) } if t.pendingPieces.Contains(piece) { - return PiecePriorityNormal + ret.Raise(PiecePriorityNormal) } - return PiecePriorityNone + if t.pieceComplete(piece) { + return PiecePriorityNone + } + return } func (t *Torrent) pendPiece(piece int) { -- 2.44.0