From: Matt Joiner <anacrolix@gmail.com>
Date: Thu, 7 Oct 2021 06:31:10 +0000 (+1100)
Subject: Change peer requesting to spread requests out evenly
X-Git-Tag: v1.34.0^2~59
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=0f53cbf07e824d97d3b7f2f95932a62e61b151e7;p=btrtrc.git

Change peer requesting to spread requests out evenly
---

diff --git a/peerconn.go b/peerconn.go
index 749832ca..dc172e6d 100644
--- a/peerconn.go
+++ b/peerconn.go
@@ -662,9 +662,7 @@ func (cn *PeerConn) postBitfield() {
 
 func (cn *PeerConn) updateRequests() {
 	if peerRequesting {
-		if cn.actualRequestState.Requests.GetCardinality() != 0 {
-			return
-		}
+		cn.nextRequestState = cn.getDesiredRequestState()
 		cn.tickleWriter()
 		return
 	}
@@ -1320,7 +1318,9 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
 	c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
 	if deletedRequest {
 		c.piecesReceivedSinceLastRequestUpdate++
-		c.updateRequests()
+		if c.nextRequestState.Requests.GetCardinality() == 0 {
+			c.updateRequests()
+		}
 		c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
 	}
 	for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
diff --git a/request-strategy/piece.go b/request-strategy/piece.go
index dfc2d928..8a038e67 100644
--- a/request-strategy/piece.go
+++ b/request-strategy/piece.go
@@ -3,7 +3,7 @@ package request_strategy
 type ChunksIterFunc func(func(ChunkIndex))
 
 type ChunksIter interface {
-	Iter(func(ChunkIndex))
+	Iter(func(ci ChunkIndex))
 }
 
 type Piece struct {
diff --git a/requesting.go b/requesting.go
index 82ad60de..4513ea3d 100644
--- a/requesting.go
+++ b/requesting.go
@@ -1,6 +1,7 @@
 package torrent
 
 import (
+	"container/heap"
 	"encoding/gob"
 	"reflect"
 	"time"
@@ -10,12 +11,13 @@ import (
 	"github.com/anacrolix/chansync/events"
 	"github.com/anacrolix/log"
 	"github.com/anacrolix/missinggo/v2/bitmap"
+	"github.com/anacrolix/multiless"
 
 	request_strategy "github.com/anacrolix/torrent/request-strategy"
 )
 
 // Calculate requests individually for each peer.
-const peerRequesting = false
+const peerRequesting = true
 
 func (cl *Client) requester() {
 	for {
@@ -160,78 +162,106 @@ func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.Pee
 type RequestIndex = request_strategy.RequestIndex
 type chunkIndexType = request_strategy.ChunkIndex
 
-func (p *Peer) applyNextRequestState() bool {
-	if peerRequesting {
-		if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
-			return true
+type peerRequests struct {
+	requestIndexes       []RequestIndex
+	peer                 *Peer
+	torrentStrategyInput request_strategy.Torrent
+}
+
+func (p peerRequests) Len() int {
+	return len(p.requestIndexes)
+}
+
+func (p peerRequests) Less(i, j int) bool {
+	leftRequest := p.requestIndexes[i]
+	rightRequest := p.requestIndexes[j]
+	t := p.peer.t
+	leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
+	rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
+	leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
+	rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
+	pending := func(index RequestIndex, current bool) int {
+		ret := t.pendingRequests[index]
+		if current {
+			ret--
 		}
-		type piece struct {
-			index   int
-			endGame bool
+		return ret
+	}
+	ml := multiless.New()
+	ml = ml.Int(
+		pending(leftRequest, leftCurrent),
+		pending(rightRequest, rightCurrent))
+	ml = ml.Bool(rightCurrent, leftCurrent)
+	ml = ml.Int(
+		int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
+		int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority))
+	ml = ml.Int(
+		int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
+		int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
+	ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
+	ml = ml.Uint32(leftRequest, rightRequest)
+	return ml.MustLess()
+}
+
+func (p peerRequests) Swap(i, j int) {
+	p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
+}
+
+func (p *peerRequests) Push(x interface{}) {
+	p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
+}
+
+func (p *peerRequests) Pop() interface{} {
+	last := len(p.requestIndexes) - 1
+	x := p.requestIndexes[last]
+	p.requestIndexes = p.requestIndexes[:last]
+	return x
+}
+
+func (p *Peer) getDesiredRequestState() (desired requestState) {
+	input := p.t.cl.getRequestStrategyInput()
+	requestHeap := peerRequests{
+		requestIndexes: nil,
+		peer:           p,
+	}
+	for _, t := range input.Torrents {
+		if t.InfoHash == p.t.infoHash {
+			requestHeap.torrentStrategyInput = t
+			break
 		}
-		var pieceOrder []piece
-		request_strategy.GetRequestablePieces(
-			p.t.cl.getRequestStrategyInput(),
-			func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
-				if t.InfoHash != p.t.infoHash {
-					return
-				}
-				if !p.peerHasPiece(pieceIndex) {
-					return
-				}
-				pieceOrder = append(pieceOrder, piece{
-					index:   pieceIndex,
-					endGame: rsp.Priority == PiecePriorityNow,
-				})
-			},
-		)
-		more := true
-		interested := false
-		for _, endGameIter := range []bool{false, true} {
-			for _, piece := range pieceOrder {
-				tp := p.t.piece(piece.index)
-				tp.iterUndirtiedChunks(func(cs chunkIndexType) {
-					req := cs + tp.requestIndexOffset()
-					if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
-						return
-					}
-					interested = true
-					more = p.setInterested(true)
-					if !more {
-						return
-					}
-					if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
-						return
-					}
-					if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
-						return
-					}
-					var err error
-					more, err = p.request(req)
-					if err != nil {
-						panic(err)
-					}
-				})
-				if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
-					break
-				}
-				if !more {
-					break
-				}
+	}
+	request_strategy.GetRequestablePieces(
+		input,
+		func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
+			if t.InfoHash != p.t.infoHash {
+				return
 			}
-			if !more {
-				break
+			if !p.peerHasPiece(pieceIndex) {
+				return
 			}
+			rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
+				requestHeap.requestIndexes = append(
+					requestHeap.requestIndexes,
+					p.t.pieceRequestIndexOffset(pieceIndex)+ci)
+			})
+		},
+	)
+	heap.Init(&requestHeap)
+	for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) {
+		requestIndex := heap.Pop(&requestHeap).(RequestIndex)
+		pieceIndex := requestIndex / p.t.chunksPerRegularPiece()
+		allowedFast := p.peerAllowedFast.Contains(pieceIndex)
+		if !allowedFast {
+			desired.Interested = true
 		}
-		if !more {
-			return false
-		}
-		if !interested {
-			p.setInterested(false)
+		if allowedFast || !p.peerChoking {
+			desired.Requests.Add(requestIndex)
 		}
-		return more
 	}
+	return
+}
 
+func (p *Peer) applyNextRequestState() bool {
 	next := p.nextRequestState
 	current := p.actualRequestState
 	if !p.setInterested(next.Interested) {