From 0f53cbf07e824d97d3b7f2f95932a62e61b151e7 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 7 Oct 2021 17:31:10 +1100 Subject: [PATCH] Change peer requesting to spread requests out evenly --- peerconn.go | 8 +- request-strategy/piece.go | 2 +- requesting.go | 160 ++++++++++++++++++++++---------------- 3 files changed, 100 insertions(+), 70 deletions(-) diff --git a/peerconn.go b/peerconn.go index 749832ca..dc172e6d 100644 --- a/peerconn.go +++ b/peerconn.go @@ -662,9 +662,7 @@ func (cn *PeerConn) postBitfield() { func (cn *PeerConn) updateRequests() { if peerRequesting { - if cn.actualRequestState.Requests.GetCardinality() != 0 { - return - } + cn.nextRequestState = cn.getDesiredRequestState() cn.tickleWriter() return } @@ -1320,7 +1318,9 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) if deletedRequest { c.piecesReceivedSinceLastRequestUpdate++ - c.updateRequests() + if c.nextRequestState.Requests.GetCardinality() == 0 { + c.updateRequests() + } c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) } for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData { diff --git a/request-strategy/piece.go b/request-strategy/piece.go index dfc2d928..8a038e67 100644 --- a/request-strategy/piece.go +++ b/request-strategy/piece.go @@ -3,7 +3,7 @@ package request_strategy type ChunksIterFunc func(func(ChunkIndex)) type ChunksIter interface { - Iter(func(ChunkIndex)) + Iter(func(ci ChunkIndex)) } type Piece struct { diff --git a/requesting.go b/requesting.go index 82ad60de..4513ea3d 100644 --- a/requesting.go +++ b/requesting.go @@ -1,6 +1,7 @@ package torrent import ( + "container/heap" "encoding/gob" "reflect" "time" @@ -10,12 +11,13 @@ import ( "github.com/anacrolix/chansync/events" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/multiless" request_strategy "github.com/anacrolix/torrent/request-strategy" ) // Calculate requests individually for each peer. -const peerRequesting = false +const peerRequesting = true func (cl *Client) requester() { for { @@ -160,78 +162,106 @@ func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.Pee type RequestIndex = request_strategy.RequestIndex type chunkIndexType = request_strategy.ChunkIndex -func (p *Peer) applyNextRequestState() bool { - if peerRequesting { - if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) { - return true +type peerRequests struct { + requestIndexes []RequestIndex + peer *Peer + torrentStrategyInput request_strategy.Torrent +} + +func (p peerRequests) Len() int { + return len(p.requestIndexes) +} + +func (p peerRequests) Less(i, j int) bool { + leftRequest := p.requestIndexes[i] + rightRequest := p.requestIndexes[j] + t := p.peer.t + leftPieceIndex := leftRequest / t.chunksPerRegularPiece() + rightPieceIndex := rightRequest / t.chunksPerRegularPiece() + leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest) + rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest) + pending := func(index RequestIndex, current bool) int { + ret := t.pendingRequests[index] + if current { + ret-- } - type piece struct { - index int - endGame bool + return ret + } + ml := multiless.New() + ml = ml.Int( + pending(leftRequest, leftCurrent), + pending(rightRequest, rightCurrent)) + ml = ml.Bool(rightCurrent, leftCurrent) + ml = ml.Int( + int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority), + int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority)) + ml = ml.Int( + int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability), + int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability)) + ml = ml.Uint32(leftPieceIndex, rightPieceIndex) + ml = ml.Uint32(leftRequest, rightRequest) + return ml.MustLess() +} + +func (p peerRequests) Swap(i, j int) { + p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i] +} + +func (p *peerRequests) Push(x interface{}) { + p.requestIndexes = append(p.requestIndexes, x.(RequestIndex)) +} + +func (p *peerRequests) Pop() interface{} { + last := len(p.requestIndexes) - 1 + x := p.requestIndexes[last] + p.requestIndexes = p.requestIndexes[:last] + return x +} + +func (p *Peer) getDesiredRequestState() (desired requestState) { + input := p.t.cl.getRequestStrategyInput() + requestHeap := peerRequests{ + requestIndexes: nil, + peer: p, + } + for _, t := range input.Torrents { + if t.InfoHash == p.t.infoHash { + requestHeap.torrentStrategyInput = t + break } - var pieceOrder []piece - request_strategy.GetRequestablePieces( - p.t.cl.getRequestStrategyInput(), - func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { - if t.InfoHash != p.t.infoHash { - return - } - if !p.peerHasPiece(pieceIndex) { - return - } - pieceOrder = append(pieceOrder, piece{ - index: pieceIndex, - endGame: rsp.Priority == PiecePriorityNow, - }) - }, - ) - more := true - interested := false - for _, endGameIter := range []bool{false, true} { - for _, piece := range pieceOrder { - tp := p.t.piece(piece.index) - tp.iterUndirtiedChunks(func(cs chunkIndexType) { - req := cs + tp.requestIndexOffset() - if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 { - return - } - interested = true - more = p.setInterested(true) - if !more { - return - } - if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() { - return - } - if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) { - return - } - var err error - more, err = p.request(req) - if err != nil { - panic(err) - } - }) - if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() { - break - } - if !more { - break - } + } + request_strategy.GetRequestablePieces( + input, + func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { + if t.InfoHash != p.t.infoHash { + return } - if !more { - break + if !p.peerHasPiece(pieceIndex) { + return } + rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) { + requestHeap.requestIndexes = append( + requestHeap.requestIndexes, + p.t.pieceRequestIndexOffset(pieceIndex)+ci) + }) + }, + ) + heap.Init(&requestHeap) + for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) { + requestIndex := heap.Pop(&requestHeap).(RequestIndex) + pieceIndex := requestIndex / p.t.chunksPerRegularPiece() + allowedFast := p.peerAllowedFast.Contains(pieceIndex) + if !allowedFast { + desired.Interested = true } - if !more { - return false - } - if !interested { - p.setInterested(false) + if allowedFast || !p.peerChoking { + desired.Requests.Add(requestIndex) } - return more } + return +} +func (p *Peer) applyNextRequestState() bool { next := p.nextRequestState current := p.actualRequestState if !p.setInterested(next.Interested) { -- 2.48.1