From: Matt Joiner Date: Thu, 28 Aug 2014 00:04:00 +0000 (+1000) Subject: Great complexifying of the responsive download strategy X-Git-Tag: v1.0.0~1580 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=1507d803bd0f1703876a1fcb7c77bc23abecd4bb;p=btrtrc.git Great complexifying of the responsive download strategy Should be better after 4 days of experimentation... --- diff --git a/download_strategies.go b/download_strategies.go index 6b288611..a71618d0 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -1,8 +1,10 @@ package torrent import ( + "container/heap" "fmt" "io" + "math/rand" pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" ) @@ -64,8 +66,7 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { for _, heatThreshold := range []int{1, 4, 15, 60} { for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { pieceIndex := pp.Integer(e.Value.(int)) - for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() { - // for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs { + for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs { r := request{pieceIndex, chunkSpec} if th[r] >= heatThreshold { continue @@ -113,6 +114,7 @@ func NewResponsiveDownloadStrategy(readahead int64) *responsiveDownloadStrategy lastReadOffset: make(map[*torrent]int64), priorities: make(map[*torrent]map[request]struct{}), requestHeat: make(map[*torrent]map[request]int), + rand: rand.New(rand.NewSource(1337)), } } @@ -123,6 +125,7 @@ type responsiveDownloadStrategy struct { lastReadOffset map[*torrent]int64 priorities map[*torrent]map[request]struct{} requestHeat map[*torrent]map[request]int + rand *rand.Rand // Avoid global lock } func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) { @@ -153,93 +156,152 @@ func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) { rh[r]-- } -func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { - th := me.requestHeat[t] - requestWrapper := func(req request) bool { - if c.RequestPending(req) { - return true - } - again := c.Request(req) - if c.RequestPending(req) { - th[req]++ - } - return again +type requestFiller struct { + c *connection + t *torrent + s *responsiveDownloadStrategy +} + +// Wrapper around connection.request that tracks request heat. +func (me *requestFiller) request(req request) bool { + if me.c.RequestPending(req) { + return true + } + again := me.c.Request(req) + if me.c.RequestPending(req) { + me.s.requestHeat[me.t][req]++ + } + return again +} + +// Adds additional constraints around the request heat wrapper. +func (me *requestFiller) conservativelyRequest(req request) bool { + again := me.request(req) + if len(me.c.Requests) >= 50 { + return false } + return again +} - for req := range me.priorities[t] { - if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { +// Fill priority requests. +func (me *requestFiller) priorities() bool { + for req := range me.s.priorities[me.t] { + if _, ok := me.t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { panic(req) } - if !requestWrapper(req) { - return + if !me.request(req) { + return false } } + return true +} - if len(c.Requests) >= 16 { +// Fill requests, with all contextual information available in the receiver. +func (me requestFiller) Run() { + if !me.priorities() { return } + if len(me.c.Requests) > 25 { + return + } + if !me.readahead() { + return + } + if len(me.c.Requests) > 0 { + return + } + me.completePartial() +} - requestWrapper = func() func(request) bool { - f := requestWrapper - return func(req request) bool { - if len(c.Requests) >= 32 { - return false - } - return f(req) +// Request partial pieces that aren't in the readahead zone. +func (me *requestFiller) completePartial() bool { + t := me.t + th := me.s.requestHeat[t] + for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { + p := e.Value.(int) + if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() { + break } - }() - - if lastReadOffset, ok := me.lastReadOffset[t]; ok { - var nextAhead int64 - for ahead := int64(0); ahead < me.Readahead; ahead = nextAhead { - off := lastReadOffset + ahead - req, ok := t.offsetRequest(off) - if !ok { - break - } - if !t.wantPiece(int(req.Index)) { - nextAhead = ahead + int64(t.PieceLength(req.Index)) - continue - } - nextAhead = ahead + int64(req.Length) - if !t.wantChunk(req) { - continue - } - if th[req] >= func() int { - // Determine allowed redundancy based on how far into the - // readahead zone we're looking. - if ahead >= (2*me.Readahead+2)/3 { - return 1 - } else if ahead >= (me.Readahead+2)/3 { - return 2 - } else { - return 3 + if lastReadOffset, ok := me.s.lastReadOffset[t]; ok { + if p >= int(lastReadOffset/int64(t.UsualPieceSize())) { + if int64(p+1)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead { + continue } - }() { + } + } + for chunkSpec := range t.Pieces[p].PendingChunkSpecs { + r := request{pp.Integer(p), chunkSpec} + if th[r] >= 1 { continue } - if !requestWrapper(req) { - return + if !me.conservativelyRequest(r) { + return false } } } + return true +} - // t.assertIncompletePiecesByBytesLeftOrdering() - for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { - p := e.Value.(int) - if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() { - break +// Returns all wanted chunk specs in the readahead zone. +func (me *requestFiller) pendingReadaheadChunks() (ret []request) { + t := me.t + lastReadOffset, ok := me.s.lastReadOffset[t] + if !ok { + return + } + ret = make([]request, 0, (me.s.Readahead+chunkSize-1)/chunkSize) + for pi := int(lastReadOffset / int64(t.UsualPieceSize())); pi < t.NumPieces() && int64(pi)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead; pi++ { + if !t.wantPiece(pi) || !me.c.PeerHasPiece(pp.Integer(pi)) { + continue } - for chunkSpec := range t.Pieces[p].PendingChunkSpecs { - r := request{pp.Integer(p), chunkSpec} - if th[r] >= 2 { + for cs := range t.Pieces[pi].PendingChunkSpecs { + r := request{pp.Integer(pi), cs} + if _, ok := me.c.Requests[r]; ok { continue } - if !requestWrapper(r) { - return + if off := t.requestOffset(r); off < lastReadOffset || off >= lastReadOffset+me.s.Readahead { + continue } + ret = append(ret, r) } } + return +} + +// Min-heap of int. +type intHeap []int + +func (h intHeap) Len() int { return len(h) } +func (h intHeap) Less(i, j int) bool { return h[i] < h[j] } +func (h intHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *intHeap) Push(x interface{}) { *h = append(*h, x.(int)) } +func (h *intHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func (me *requestFiller) readahead() bool { + rr := me.pendingReadaheadChunks() + if len(rr) == 0 { + return true + } + // Produce a partially sorted random permutation into the readahead chunks to somewhat preserve order but reducing wasted chunks due to overlap with other peers. + ii := new(intHeap) + *ii = me.s.rand.Perm(len(rr)) + heap.Init(ii) + for _, i := range *ii { + if !me.conservativelyRequest(rr[i]) { + return false + } + } + return true +} + +func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { + (requestFiller{c, t, me}).Run() } func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {