]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move priority management entirely into the download strategies
authorMatt Joiner <anacrolix@gmail.com>
Thu, 24 Jul 2014 03:42:31 +0000 (13:42 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 24 Jul 2014 03:42:31 +0000 (13:42 +1000)
client.go
download_strategies.go
torrent.go

index 0567192b72efe19f0990285b65db66912cd8df45..093cbe01df5df2ed96d84a42cf84bdc11e6471d3 100644 (file)
--- a/client.go
+++ b/client.go
@@ -19,7 +19,6 @@ import (
        "bitbucket.org/anacrolix/go.torrent/dht"
        "bitbucket.org/anacrolix/go.torrent/util"
        "bufio"
-       "container/list"
        "crypto/rand"
        "crypto/sha1"
        "errors"
@@ -64,30 +63,7 @@ func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
        if !t.haveInfo() {
                return errors.New("missing metadata")
        }
-       newPriorities := make([]request, 0, (len_+chunkSize-1)/chunkSize)
-       for len_ > 0 {
-               req, ok := t.offsetRequest(off)
-               if !ok {
-                       return errors.New("bad offset")
-               }
-               reqOff := t.requestOffset(req)
-               // Gain the alignment adjustment.
-               len_ += off - reqOff
-               // Lose the length of this block.
-               len_ -= int64(req.Length)
-               off = reqOff + int64(req.Length)
-               if !t.wantPiece(int(req.Index)) {
-                       continue
-               }
-               newPriorities = append(newPriorities, req)
-       }
-       if len(newPriorities) == 0 {
-               return nil
-       }
-       t.Priorities.PushFront(newPriorities[0])
-       for _, req := range newPriorities[1:] {
-               t.Priorities.PushBack(req)
-       }
+       me.DownloadStrategy.TorrentPrioritize(t, off, len_)
        for _, cn := range t.Conns {
                me.replenishConnRequests(t, cn)
        }
@@ -158,7 +134,6 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
                err = io.EOF
                return
        }
-       t.lastReadPiece = int(index)
        piece := t.Pieces[index]
        pieceOff := pp.Integer(off % int64(t.PieceLength(0)))
        high := int(t.PieceLength(index) - pieceOff)
@@ -1142,13 +1117,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        }
 
        // Unprioritize the chunk.
-       var next *list.Element
-       for e := t.Priorities.Front(); e != nil; e = next {
-               next = e.Next()
-               if e.Value.(request) == req {
-                       t.Priorities.Remove(e)
-               }
-       }
+       me.DownloadStrategy.TorrentGotChunk(t, req)
 
        // Cancel pending requests for this chunk.
        cancelled := false
@@ -1189,14 +1158,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
        p.EverHashed = true
        if correct {
                p.PendingChunkSpecs = nil
-               // log.Printf("%s: got piece %d, (%d/%d)", t, piece, t.NumPiecesCompleted(), t.NumPieces())
-               var next *list.Element
-               for e := t.Priorities.Front(); e != nil; e = next {
-                       next = e.Next()
-                       if e.Value.(request).Index == piece {
-                               t.Priorities.Remove(e)
-                       }
-               }
+               me.DownloadStrategy.TorrentGotPiece(t, int(piece))
                me.dataReady(dataSpec{
                        t.InfoHash,
                        request{
index 43bd1d2d864e97db9cc7344861c8bb39a3d4a860..cd42ecbd175b682fbcc8551a52331985e36ee54d 100644 (file)
@@ -2,6 +2,7 @@ package torrent
 
 import (
        pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+       "container/list"
 )
 
 type DownloadStrategy interface {
@@ -9,6 +10,9 @@ type DownloadStrategy interface {
        TorrentStarted(t *torrent)
        TorrentStopped(t *torrent)
        DeleteRequest(t *torrent, r request)
+       TorrentPrioritize(t *torrent, off, _len int64)
+       TorrentGotChunk(t *torrent, r request)
+       TorrentGotPiece(t *torrent, piece int)
 }
 
 type DefaultDownloadStrategy struct {
@@ -45,11 +49,11 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
                return
        }
        // First request prioritized chunks.
-       for e := t.Priorities.Front(); e != nil; e = e.Next() {
-               if !addRequest(e.Value.(request)) {
-                       return
-               }
-       }
+       // for e := t.Priorities.Front(); e != nil; e = e.Next() {
+       //      if !addRequest(e.Value.(request)) {
+       //              return
+       //      }
+       // }
        // Then finish off incomplete pieces in order of bytes remaining.
        for _, heatThreshold := range []int{1, 4, 15, 60} {
                for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
@@ -93,31 +97,68 @@ func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
        m[r]--
 }
 
-type ResponsiveDownloadStrategy struct {
+func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request)      {}
+func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int)      {}
+func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
+
+func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy {
+       return &responsiveDownloadStrategy{
+               Readahead:     readahead,
+               lastReadPiece: make(map[*torrent]int),
+               priorities:    make(map[*torrent]*list.List),
+       }
+}
+
+type responsiveDownloadStrategy struct {
        // How many bytes to preemptively download starting at the beginning of
        // the last piece read for a given torrent.
-       Readahead int
+       Readahead     int
+       lastReadPiece map[*torrent]int
+       priorities    map[*torrent]*list.List
 }
 
-func (ResponsiveDownloadStrategy) TorrentStarted(*torrent)         {}
-func (ResponsiveDownloadStrategy) TorrentStopped(*torrent)         {}
-func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
+func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) {
+       me.priorities[t] = list.New()
+}
+
+func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) {
+       delete(me.lastReadPiece, t)
+       delete(me.priorities, t)
+}
+func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
 
-func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
-       for e := t.Priorities.Front(); e != nil; e = e.Next() {
+func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+       if len(c.Requests) >= (c.PeerMaxRequests+1)/2 || len(c.Requests) >= 64 {
+               return
+       }
+
+       // Short circuit request fills at a level that might reduce receiving of
+       // unnecessary chunks.
+       requestWrapper := func(r request) bool {
+               if len(c.Requests) >= 64 {
+                       return false
+               }
+               return c.Request(r)
+       }
+
+       prios := me.priorities[t]
+       for e := prios.Front(); e != nil; e = e.Next() {
                req := e.Value.(request)
                if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
                        panic(req)
                }
-               if !c.Request(e.Value.(request)) {
+               if !requestWrapper(e.Value.(request)) {
                        return
                }
        }
-       readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
-       for i := t.lastReadPiece; i < t.lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
-               for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() {
-                       if !c.Request(request{pp.Integer(i), cs}) {
-                               return
+
+       if lastReadPiece, ok := me.lastReadPiece[t]; ok {
+               readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
+               for i := lastReadPiece; i < lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
+                       for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() {
+                               if !requestWrapper(request{pp.Integer(i), cs}) {
+                                       return
+                               }
                        }
                }
        }
@@ -131,9 +172,63 @@ func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
                // Request chunks in random order to reduce overlap with other
                // connections.
                for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
-                       if !c.Request(request{pp.Integer(index), cs}) {
+                       if !requestWrapper(request{pp.Integer(index), cs}) {
                                return
                        }
                }
        }
 }
+
+func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {
+       prios := me.priorities[t]
+       var next *list.Element
+       for e := prios.Front(); e != nil; e = next {
+               next = e.Next()
+               if e.Value.(request) == req {
+                       prios.Remove(e)
+               }
+       }
+}
+
+func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {
+       var next *list.Element
+       prios := me.priorities[t]
+       for e := prios.Front(); e != nil; e = next {
+               next = e.Next()
+               if int(e.Value.(request).Index) == piece {
+                       prios.Remove(e)
+               }
+       }
+}
+
+func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {
+       newPriorities := make([]request, 0, (_len+chunkSize-1)/chunkSize)
+       for _len > 0 {
+               req, ok := t.offsetRequest(off)
+               if !ok {
+                       panic("bad offset")
+               }
+               reqOff := t.requestOffset(req)
+               // Gain the alignment adjustment.
+               _len += off - reqOff
+               // Lose the length of this block.
+               _len -= int64(req.Length)
+               off = reqOff + int64(req.Length)
+               if !t.wantPiece(int(req.Index)) {
+                       continue
+               }
+               newPriorities = append(newPriorities, req)
+       }
+       if len(newPriorities) == 0 {
+               return
+       }
+       s.lastReadPiece[t] = int(newPriorities[0].Index)
+       if t.wantChunk(newPriorities[0]) {
+               s.priorities[t].PushFront(newPriorities[0])
+       }
+       for _, req := range newPriorities[1:] {
+               if t.wantChunk(req) {
+                       s.priorities[t].PushBack(req)
+               }
+       }
+}
index 7140652f5fd31963f1823cb1f1ffc4edeba8d489..c6f26716276c67f2a7f511fd760038aedbf57277 100644 (file)
@@ -42,18 +42,16 @@ type torrent struct {
        PiecesByBytesLeft *OrderedList
        Data              mmap_span.MMapSpan
        // Prevent mutations to Data memory maps while in use as they're not safe.
-       dataLock   sync.RWMutex
-       Info       *metainfo.Info
-       Conns      []*connection
-       Peers      []Peer
-       Priorities *list.List
+       dataLock sync.RWMutex
+       Info     *metainfo.Info
+       Conns    []*connection
+       Peers    []Peer
        // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
        // mirror their respective URLs from the announce-list key.
-       Trackers      [][]tracker.Client
-       lastReadPiece int
-       DisplayName   string
-       MetaData      []byte
-       metadataHave  []bool
+       Trackers     [][]tracker.Client
+       DisplayName  string
+       MetaData     []byte
+       metadataHave []bool
 }
 
 func (t *torrent) InvalidateMetadata() {
@@ -124,7 +122,6 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
                piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index)
                t.pendAllChunkSpecs(pp.Integer(index))
        }
-       t.Priorities = list.New()
        for _, conn := range t.Conns {
                if err := conn.setNumPieces(t.NumPieces()); err != nil {
                        log.Printf("closing connection: %s", err)
@@ -211,12 +208,12 @@ func (t *torrent) WriteStatus(w io.Writer) {
                fmt.Fprintf(w, "%c", t.pieceStatusChar(index))
        }
        fmt.Fprintln(w)
-       fmt.Fprintln(w, "Priorities: ")
-       if t.Priorities != nil {
-               for e := t.Priorities.Front(); e != nil; e = e.Next() {
-                       fmt.Fprintf(w, "\t%v\n", e.Value)
-               }
-       }
+       // fmt.Fprintln(w, "Priorities: ")
+       // if t.Priorities != nil {
+       //      for e := t.Priorities.Front(); e != nil; e = e.Next() {
+       //              fmt.Fprintf(w, "\t%v\n", e.Value)
+       //      }
+       // }
        fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
        for _, c := range t.Conns {
                c.WriteStatus(w)
@@ -419,6 +416,14 @@ func (me *torrent) haveAnyPieces() bool {
        return false
 }
 
+func (t *torrent) wantChunk(r request) bool {
+       if !t.wantPiece(int(r.Index)) {
+               return false
+       }
+       _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
+       return ok
+}
+
 func (t *torrent) wantPiece(index int) bool {
        if !t.haveInfo() {
                return false