]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Great complexifying of the responsive download strategy
authorMatt Joiner <anacrolix@gmail.com>
Thu, 28 Aug 2014 00:04:00 +0000 (10:04 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 28 Aug 2014 00:04:00 +0000 (10:04 +1000)
Should be better after 4 days of experimentation...

download_strategies.go

index 6b2886118d504d73e7d1393b0c1ec9da790972ed..a71618d0eec848bf8ec5b9e75e2355b0714743a7 100644 (file)
@@ -1,8 +1,10 @@
 package torrent
 
 import (
+       "container/heap"
        "fmt"
        "io"
+       "math/rand"
 
        pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
 )
@@ -64,8 +66,7 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
        for _, heatThreshold := range []int{1, 4, 15, 60} {
                for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
                        pieceIndex := pp.Integer(e.Value.(int))
-                       for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
-                               // for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
+                       for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
                                r := request{pieceIndex, chunkSpec}
                                if th[r] >= heatThreshold {
                                        continue
@@ -113,6 +114,7 @@ func NewResponsiveDownloadStrategy(readahead int64) *responsiveDownloadStrategy
                lastReadOffset: make(map[*torrent]int64),
                priorities:     make(map[*torrent]map[request]struct{}),
                requestHeat:    make(map[*torrent]map[request]int),
+               rand:           rand.New(rand.NewSource(1337)),
        }
 }
 
@@ -123,6 +125,7 @@ type responsiveDownloadStrategy struct {
        lastReadOffset map[*torrent]int64
        priorities     map[*torrent]map[request]struct{}
        requestHeat    map[*torrent]map[request]int
+       rand           *rand.Rand // Avoid global lock
 }
 
 func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) {
@@ -153,93 +156,152 @@ func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) {
        rh[r]--
 }
 
-func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
-       th := me.requestHeat[t]
-       requestWrapper := func(req request) bool {
-               if c.RequestPending(req) {
-                       return true
-               }
-               again := c.Request(req)
-               if c.RequestPending(req) {
-                       th[req]++
-               }
-               return again
+type requestFiller struct {
+       c *connection
+       t *torrent
+       s *responsiveDownloadStrategy
+}
+
+// Wrapper around connection.request that tracks request heat.
+func (me *requestFiller) request(req request) bool {
+       if me.c.RequestPending(req) {
+               return true
+       }
+       again := me.c.Request(req)
+       if me.c.RequestPending(req) {
+               me.s.requestHeat[me.t][req]++
+       }
+       return again
+}
+
+// Adds additional constraints around the request heat wrapper.
+func (me *requestFiller) conservativelyRequest(req request) bool {
+       again := me.request(req)
+       if len(me.c.Requests) >= 50 {
+               return false
        }
+       return again
+}
 
-       for req := range me.priorities[t] {
-               if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
+// Fill priority requests.
+func (me *requestFiller) priorities() bool {
+       for req := range me.s.priorities[me.t] {
+               if _, ok := me.t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
                        panic(req)
                }
-               if !requestWrapper(req) {
-                       return
+               if !me.request(req) {
+                       return false
                }
        }
+       return true
+}
 
-       if len(c.Requests) >= 16 {
+// Fill requests, with all contextual information available in the receiver.
+func (me requestFiller) Run() {
+       if !me.priorities() {
                return
        }
+       if len(me.c.Requests) > 25 {
+               return
+       }
+       if !me.readahead() {
+               return
+       }
+       if len(me.c.Requests) > 0 {
+               return
+       }
+       me.completePartial()
+}
 
-       requestWrapper = func() func(request) bool {
-               f := requestWrapper
-               return func(req request) bool {
-                       if len(c.Requests) >= 32 {
-                               return false
-                       }
-                       return f(req)
+// Request partial pieces that aren't in the readahead zone.
+func (me *requestFiller) completePartial() bool {
+       t := me.t
+       th := me.s.requestHeat[t]
+       for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+               p := e.Value.(int)
+               if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
+                       break
                }
-       }()
-
-       if lastReadOffset, ok := me.lastReadOffset[t]; ok {
-               var nextAhead int64
-               for ahead := int64(0); ahead < me.Readahead; ahead = nextAhead {
-                       off := lastReadOffset + ahead
-                       req, ok := t.offsetRequest(off)
-                       if !ok {
-                               break
-                       }
-                       if !t.wantPiece(int(req.Index)) {
-                               nextAhead = ahead + int64(t.PieceLength(req.Index))
-                               continue
-                       }
-                       nextAhead = ahead + int64(req.Length)
-                       if !t.wantChunk(req) {
-                               continue
-                       }
-                       if th[req] >= func() int {
-                               // Determine allowed redundancy based on how far into the
-                               // readahead zone we're looking.
-                               if ahead >= (2*me.Readahead+2)/3 {
-                                       return 1
-                               } else if ahead >= (me.Readahead+2)/3 {
-                                       return 2
-                               } else {
-                                       return 3
+               if lastReadOffset, ok := me.s.lastReadOffset[t]; ok {
+                       if p >= int(lastReadOffset/int64(t.UsualPieceSize())) {
+                               if int64(p+1)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead {
+                                       continue
                                }
-                       }() {
+                       }
+               }
+               for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
+                       r := request{pp.Integer(p), chunkSpec}
+                       if th[r] >= 1 {
                                continue
                        }
-                       if !requestWrapper(req) {
-                               return
+                       if !me.conservativelyRequest(r) {
+                               return false
                        }
                }
        }
+       return true
+}
 
-       // t.assertIncompletePiecesByBytesLeftOrdering()
-       for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
-               p := e.Value.(int)
-               if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
-                       break
+// Returns all wanted chunk specs in the readahead zone.
+func (me *requestFiller) pendingReadaheadChunks() (ret []request) {
+       t := me.t
+       lastReadOffset, ok := me.s.lastReadOffset[t]
+       if !ok {
+               return
+       }
+       ret = make([]request, 0, (me.s.Readahead+chunkSize-1)/chunkSize)
+       for pi := int(lastReadOffset / int64(t.UsualPieceSize())); pi < t.NumPieces() && int64(pi)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead; pi++ {
+               if !t.wantPiece(pi) || !me.c.PeerHasPiece(pp.Integer(pi)) {
+                       continue
                }
-               for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
-                       r := request{pp.Integer(p), chunkSpec}
-                       if th[r] >= 2 {
+               for cs := range t.Pieces[pi].PendingChunkSpecs {
+                       r := request{pp.Integer(pi), cs}
+                       if _, ok := me.c.Requests[r]; ok {
                                continue
                        }
-                       if !requestWrapper(r) {
-                               return
+                       if off := t.requestOffset(r); off < lastReadOffset || off >= lastReadOffset+me.s.Readahead {
+                               continue
                        }
+                       ret = append(ret, r)
                }
        }
+       return
+}
+
+// Min-heap of int.
+type intHeap []int
+
+func (h intHeap) Len() int            { return len(h) }
+func (h intHeap) Less(i, j int) bool  { return h[i] < h[j] }
+func (h intHeap) Swap(i, j int)       { h[i], h[j] = h[j], h[i] }
+func (h *intHeap) Push(x interface{}) { *h = append(*h, x.(int)) }
+func (h *intHeap) Pop() interface{} {
+       old := *h
+       n := len(old)
+       x := old[n-1]
+       *h = old[0 : n-1]
+       return x
+}
+
+func (me *requestFiller) readahead() bool {
+       rr := me.pendingReadaheadChunks()
+       if len(rr) == 0 {
+               return true
+       }
+       // Produce a partially sorted random permutation into the readahead chunks to somewhat preserve order but reducing wasted chunks due to overlap with other peers.
+       ii := new(intHeap)
+       *ii = me.s.rand.Perm(len(rr))
+       heap.Init(ii)
+       for _, i := range *ii {
+               if !me.conservativelyRequest(rr[i]) {
+                       return false
+               }
+       }
+       return true
+}
+
+func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+       (requestFiller{c, t, me}).Run()
 }
 
 func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {