]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Change peer requesting to spread requests out evenly
authorMatt Joiner <anacrolix@gmail.com>
Thu, 7 Oct 2021 06:31:10 +0000 (17:31 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 19 Oct 2021 03:08:13 +0000 (14:08 +1100)
peerconn.go
request-strategy/piece.go
requesting.go

index 749832ca14de9994b3f976afc0f6547d1bdcc6f5..dc172e6d336841faa38448716402d2be471aa07b 100644 (file)
@@ -662,9 +662,7 @@ func (cn *PeerConn) postBitfield() {
 
 func (cn *PeerConn) updateRequests() {
        if peerRequesting {
-               if cn.actualRequestState.Requests.GetCardinality() != 0 {
-                       return
-               }
+               cn.nextRequestState = cn.getDesiredRequestState()
                cn.tickleWriter()
                return
        }
@@ -1320,7 +1318,9 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
        if deletedRequest {
                c.piecesReceivedSinceLastRequestUpdate++
-               c.updateRequests()
+               if c.nextRequestState.Requests.GetCardinality() == 0 {
+                       c.updateRequests()
+               }
                c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
        }
        for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
index dfc2d928a323710add9c010bbc66858a19246e24..8a038e67dafe8a9ea73531085e6057f0cf500077 100644 (file)
@@ -3,7 +3,7 @@ package request_strategy
 type ChunksIterFunc func(func(ChunkIndex))
 
 type ChunksIter interface {
-       Iter(func(ChunkIndex))
+       Iter(func(ci ChunkIndex))
 }
 
 type Piece struct {
index 82ad60de55f0386b478180138475b021e7d21c41..4513ea3dfbbc904d6917aca3dd957bd26f3d6b19 100644 (file)
@@ -1,6 +1,7 @@
 package torrent
 
 import (
+       "container/heap"
        "encoding/gob"
        "reflect"
        "time"
@@ -10,12 +11,13 @@ import (
        "github.com/anacrolix/chansync/events"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2/bitmap"
+       "github.com/anacrolix/multiless"
 
        request_strategy "github.com/anacrolix/torrent/request-strategy"
 )
 
 // Calculate requests individually for each peer.
-const peerRequesting = false
+const peerRequesting = true
 
 func (cl *Client) requester() {
        for {
@@ -160,78 +162,106 @@ func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.Pee
 type RequestIndex = request_strategy.RequestIndex
 type chunkIndexType = request_strategy.ChunkIndex
 
-func (p *Peer) applyNextRequestState() bool {
-       if peerRequesting {
-               if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
-                       return true
+type peerRequests struct {
+       requestIndexes       []RequestIndex
+       peer                 *Peer
+       torrentStrategyInput request_strategy.Torrent
+}
+
+func (p peerRequests) Len() int {
+       return len(p.requestIndexes)
+}
+
+func (p peerRequests) Less(i, j int) bool {
+       leftRequest := p.requestIndexes[i]
+       rightRequest := p.requestIndexes[j]
+       t := p.peer.t
+       leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
+       rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
+       leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
+       rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
+       pending := func(index RequestIndex, current bool) int {
+               ret := t.pendingRequests[index]
+               if current {
+                       ret--
                }
-               type piece struct {
-                       index   int
-                       endGame bool
+               return ret
+       }
+       ml := multiless.New()
+       ml = ml.Int(
+               pending(leftRequest, leftCurrent),
+               pending(rightRequest, rightCurrent))
+       ml = ml.Bool(rightCurrent, leftCurrent)
+       ml = ml.Int(
+               int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
+               int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority))
+       ml = ml.Int(
+               int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
+               int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
+       ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
+       ml = ml.Uint32(leftRequest, rightRequest)
+       return ml.MustLess()
+}
+
+func (p peerRequests) Swap(i, j int) {
+       p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
+}
+
+func (p *peerRequests) Push(x interface{}) {
+       p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
+}
+
+func (p *peerRequests) Pop() interface{} {
+       last := len(p.requestIndexes) - 1
+       x := p.requestIndexes[last]
+       p.requestIndexes = p.requestIndexes[:last]
+       return x
+}
+
+func (p *Peer) getDesiredRequestState() (desired requestState) {
+       input := p.t.cl.getRequestStrategyInput()
+       requestHeap := peerRequests{
+               requestIndexes: nil,
+               peer:           p,
+       }
+       for _, t := range input.Torrents {
+               if t.InfoHash == p.t.infoHash {
+                       requestHeap.torrentStrategyInput = t
+                       break
                }
-               var pieceOrder []piece
-               request_strategy.GetRequestablePieces(
-                       p.t.cl.getRequestStrategyInput(),
-                       func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
-                               if t.InfoHash != p.t.infoHash {
-                                       return
-                               }
-                               if !p.peerHasPiece(pieceIndex) {
-                                       return
-                               }
-                               pieceOrder = append(pieceOrder, piece{
-                                       index:   pieceIndex,
-                                       endGame: rsp.Priority == PiecePriorityNow,
-                               })
-                       },
-               )
-               more := true
-               interested := false
-               for _, endGameIter := range []bool{false, true} {
-                       for _, piece := range pieceOrder {
-                               tp := p.t.piece(piece.index)
-                               tp.iterUndirtiedChunks(func(cs chunkIndexType) {
-                                       req := cs + tp.requestIndexOffset()
-                                       if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
-                                               return
-                                       }
-                                       interested = true
-                                       more = p.setInterested(true)
-                                       if !more {
-                                               return
-                                       }
-                                       if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
-                                               return
-                                       }
-                                       if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
-                                               return
-                                       }
-                                       var err error
-                                       more, err = p.request(req)
-                                       if err != nil {
-                                               panic(err)
-                                       }
-                               })
-                               if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
-                                       break
-                               }
-                               if !more {
-                                       break
-                               }
+       }
+       request_strategy.GetRequestablePieces(
+               input,
+               func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
+                       if t.InfoHash != p.t.infoHash {
+                               return
                        }
-                       if !more {
-                               break
+                       if !p.peerHasPiece(pieceIndex) {
+                               return
                        }
+                       rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
+                               requestHeap.requestIndexes = append(
+                                       requestHeap.requestIndexes,
+                                       p.t.pieceRequestIndexOffset(pieceIndex)+ci)
+                       })
+               },
+       )
+       heap.Init(&requestHeap)
+       for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) {
+               requestIndex := heap.Pop(&requestHeap).(RequestIndex)
+               pieceIndex := requestIndex / p.t.chunksPerRegularPiece()
+               allowedFast := p.peerAllowedFast.Contains(pieceIndex)
+               if !allowedFast {
+                       desired.Interested = true
                }
-               if !more {
-                       return false
-               }
-               if !interested {
-                       p.setInterested(false)
+               if allowedFast || !p.peerChoking {
+                       desired.Requests.Add(requestIndex)
                }
-               return more
        }
+       return
+}
 
+func (p *Peer) applyNextRequestState() bool {
        next := p.nextRequestState
        current := p.actualRequestState
        if !p.setInterested(next.Interested) {