package torrent
import (
+ "container/heap"
"fmt"
"io"
+ "math/rand"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
)
for _, heatThreshold := range []int{1, 4, 15, 60} {
for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
pieceIndex := pp.Integer(e.Value.(int))
- for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
- // for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
+ for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
r := request{pieceIndex, chunkSpec}
if th[r] >= heatThreshold {
continue
lastReadOffset: make(map[*torrent]int64),
priorities: make(map[*torrent]map[request]struct{}),
requestHeat: make(map[*torrent]map[request]int),
+ rand: rand.New(rand.NewSource(1337)),
}
}
lastReadOffset map[*torrent]int64
priorities map[*torrent]map[request]struct{}
requestHeat map[*torrent]map[request]int
+ rand *rand.Rand // Avoid global lock
}
func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) {
rh[r]--
}
-func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
- th := me.requestHeat[t]
- requestWrapper := func(req request) bool {
- if c.RequestPending(req) {
- return true
- }
- again := c.Request(req)
- if c.RequestPending(req) {
- th[req]++
- }
- return again
+type requestFiller struct {
+ c *connection
+ t *torrent
+ s *responsiveDownloadStrategy
+}
+
+// Wrapper around connection.request that tracks request heat.
+func (me *requestFiller) request(req request) bool {
+ if me.c.RequestPending(req) {
+ return true
+ }
+ again := me.c.Request(req)
+ if me.c.RequestPending(req) {
+ me.s.requestHeat[me.t][req]++
+ }
+ return again
+}
+
+// Adds additional constraints around the request heat wrapper.
+func (me *requestFiller) conservativelyRequest(req request) bool {
+ again := me.request(req)
+ if len(me.c.Requests) >= 50 {
+ return false
}
+ return again
+}
- for req := range me.priorities[t] {
- if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
+// Fill priority requests.
+func (me *requestFiller) priorities() bool {
+ for req := range me.s.priorities[me.t] {
+ if _, ok := me.t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
panic(req)
}
- if !requestWrapper(req) {
- return
+ if !me.request(req) {
+ return false
}
}
+ return true
+}
- if len(c.Requests) >= 16 {
+// Fill requests, with all contextual information available in the receiver.
+func (me requestFiller) Run() {
+ if !me.priorities() {
return
}
+ if len(me.c.Requests) > 25 {
+ return
+ }
+ if !me.readahead() {
+ return
+ }
+ if len(me.c.Requests) > 0 {
+ return
+ }
+ me.completePartial()
+}
- requestWrapper = func() func(request) bool {
- f := requestWrapper
- return func(req request) bool {
- if len(c.Requests) >= 32 {
- return false
- }
- return f(req)
+// Request partial pieces that aren't in the readahead zone.
+func (me *requestFiller) completePartial() bool {
+ t := me.t
+ th := me.s.requestHeat[t]
+ for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+ p := e.Value.(int)
+ if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
+ break
}
- }()
-
- if lastReadOffset, ok := me.lastReadOffset[t]; ok {
- var nextAhead int64
- for ahead := int64(0); ahead < me.Readahead; ahead = nextAhead {
- off := lastReadOffset + ahead
- req, ok := t.offsetRequest(off)
- if !ok {
- break
- }
- if !t.wantPiece(int(req.Index)) {
- nextAhead = ahead + int64(t.PieceLength(req.Index))
- continue
- }
- nextAhead = ahead + int64(req.Length)
- if !t.wantChunk(req) {
- continue
- }
- if th[req] >= func() int {
- // Determine allowed redundancy based on how far into the
- // readahead zone we're looking.
- if ahead >= (2*me.Readahead+2)/3 {
- return 1
- } else if ahead >= (me.Readahead+2)/3 {
- return 2
- } else {
- return 3
+ if lastReadOffset, ok := me.s.lastReadOffset[t]; ok {
+ if p >= int(lastReadOffset/int64(t.UsualPieceSize())) {
+ if int64(p+1)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead {
+ continue
}
- }() {
+ }
+ }
+ for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
+ r := request{pp.Integer(p), chunkSpec}
+ if th[r] >= 1 {
continue
}
- if !requestWrapper(req) {
- return
+ if !me.conservativelyRequest(r) {
+ return false
}
}
}
+ return true
+}
- // t.assertIncompletePiecesByBytesLeftOrdering()
- for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
- p := e.Value.(int)
- if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
- break
+// Returns all wanted chunk specs in the readahead zone.
+func (me *requestFiller) pendingReadaheadChunks() (ret []request) {
+ t := me.t
+ lastReadOffset, ok := me.s.lastReadOffset[t]
+ if !ok {
+ return
+ }
+ ret = make([]request, 0, (me.s.Readahead+chunkSize-1)/chunkSize)
+ for pi := int(lastReadOffset / int64(t.UsualPieceSize())); pi < t.NumPieces() && int64(pi)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead; pi++ {
+ if !t.wantPiece(pi) || !me.c.PeerHasPiece(pp.Integer(pi)) {
+ continue
}
- for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
- r := request{pp.Integer(p), chunkSpec}
- if th[r] >= 2 {
+ for cs := range t.Pieces[pi].PendingChunkSpecs {
+ r := request{pp.Integer(pi), cs}
+ if _, ok := me.c.Requests[r]; ok {
continue
}
- if !requestWrapper(r) {
- return
+ if off := t.requestOffset(r); off < lastReadOffset || off >= lastReadOffset+me.s.Readahead {
+ continue
}
+ ret = append(ret, r)
}
}
+ return
+}
+
+// Min-heap of int.
+type intHeap []int
+
+func (h intHeap) Len() int { return len(h) }
+func (h intHeap) Less(i, j int) bool { return h[i] < h[j] }
+func (h intHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+func (h *intHeap) Push(x interface{}) { *h = append(*h, x.(int)) }
+func (h *intHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}
+
+func (me *requestFiller) readahead() bool {
+ rr := me.pendingReadaheadChunks()
+ if len(rr) == 0 {
+ return true
+ }
+ // Produce a partially sorted random permutation into the readahead chunks to somewhat preserve order but reducing wasted chunks due to overlap with other peers.
+ ii := new(intHeap)
+ *ii = me.s.rand.Perm(len(rr))
+ heap.Init(ii)
+ for _, i := range *ii {
+ if !me.conservativelyRequest(rr[i]) {
+ return false
+ }
+ }
+ return true
+}
+
+func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+ (requestFiller{c, t, me}).Run()
}
func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {