]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Get request rebalancing working optimally!
authorMatt Joiner <anacrolix@gmail.com>
Thu, 13 May 2021 10:56:12 +0000 (20:56 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:39 +0000 (13:01 +1000)
request-strategy/order.go
request-strategy/order_test.go
request-strategy/piece.go

index da6890e583fd51c4b5e12b1578518ab389e6bafa..c16cb3ea9c533244f8f5080ec6761187c83432cf 100644 (file)
@@ -1,7 +1,6 @@
 package request_strategy
 
 import (
-       "math"
        "sort"
 
        "github.com/anacrolix/multiless"
@@ -55,14 +54,12 @@ func (rp *requestsPeer) canFitRequest() bool {
        return len(rp.nextState.Requests) < rp.MaxRequests
 }
 
-// Returns true if it is added and wasn't there before.
-func (rp *requestsPeer) addNextRequest(r Request) bool {
+func (rp *requestsPeer) addNextRequest(r Request) {
        _, ok := rp.nextState.Requests[r]
        if ok {
-               return false
+               panic("should only add once")
        }
        rp.nextState.Requests[r] = struct{}{}
-       return true
 }
 
 type peersForPieceRequests struct {
@@ -71,9 +68,8 @@ type peersForPieceRequests struct {
 }
 
 func (me *peersForPieceRequests) addNextRequest(r Request) {
-       if me.requestsPeer.addNextRequest(r) {
-               me.requestsInPiece++
-       }
+       me.requestsPeer.addNextRequest(r)
+       me.requestsInPiece++
 }
 
 type Torrent struct {
@@ -155,9 +151,16 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
                        requestsPeer:    peer,
                })
        }
-       sortPeersForPiece := func() {
+       defer func() {
+               for _, peer := range peersForPiece {
+                       if peer.canRequestPiece(p.index) {
+                               peer.requestablePiecesRemaining--
+                       }
+               }
+       }()
+       sortPeersForPiece := func(byHasRequest *Request) {
                sort.Slice(peersForPiece, func(i, j int) bool {
-                       return multiless.New().Int(
+                       ml := multiless.New().Int(
                                peersForPiece[i].requestsInPiece,
                                peersForPiece[j].requestsInPiece,
                        ).Int(
@@ -166,7 +169,13 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
                        ).Float64(
                                peersForPiece[j].DownloadRate,
                                peersForPiece[i].DownloadRate,
-                       ).Int64(
+                       )
+                       if byHasRequest != nil {
+                               _, iHas := peersForPiece[i].nextState.Requests[*byHasRequest]
+                               _, jHas := peersForPiece[j].nextState.Requests[*byHasRequest]
+                               ml = ml.Bool(jHas, iHas)
+                       }
+                       return ml.Int64(
                                int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
                                // TODO: Probably peer priority can come next
                        ).Uintptr(
@@ -175,65 +184,68 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
                        ).MustLess()
                })
        }
+       preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
+       p.iterPendingChunksWrapper(func(spec ChunkSpec) {
+               req := Request{pp.Integer(p.index), spec}
+               for _, p := range peersForPiece {
+                       if h := p.HasExistingRequest; h != nil && h(req) {
+                               preallocated[spec] = p
+                               p.addNextRequest(req)
+                       }
+               }
+       })
        pendingChunksRemaining := int(p.NumPendingChunks)
-       if f := p.IterPendingChunks; f != nil {
-               f(func(chunk types.ChunkSpec) {
-                       req := Request{pp.Integer(p.index), chunk}
-                       defer func() { pendingChunksRemaining-- }()
-                       sortPeersForPiece()
-                       skipped := 0
-                       // Try up to the number of peers that could legitimately receive the request equal to
-                       // the number of chunks left. This should ensure that only the best peers serve the last
-                       // few chunks in a piece.
-                       lowestNumRequestsInPiece := math.MaxInt16
-                       for _, peer := range peersForPiece {
-                               if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
-                                       continue
-                               }
-                               if skipped+1 >= pendingChunksRemaining {
-                                       break
-                               }
-                               if f := peer.HasExistingRequest; f == nil || !f(req) {
-                                       skipped++
-                                       lowestNumRequestsInPiece = peer.requestsInPiece
-                                       continue
-                               }
-                               if peer.requestsInPiece > lowestNumRequestsInPiece {
-                                       break
-                               }
-                               if !peer.pieceAllowedFastOrDefault(p.index) {
-                                       // We must stay interested for this.
-                                       peer.nextState.Interested = true
-                               }
-                               peer.addNextRequest(req)
-                               return
+       p.iterPendingChunksWrapper(func(chunk types.ChunkSpec) {
+               if _, ok := preallocated[chunk]; ok {
+                       return
+               }
+               req := Request{pp.Integer(p.index), chunk}
+               defer func() { pendingChunksRemaining-- }()
+               sortPeersForPiece(nil)
+               for _, peer := range peersForPiece {
+                       if !peer.canFitRequest() {
+                               continue
                        }
-                       for _, peer := range peersForPiece {
-                               if !peer.canFitRequest() {
+                       if !peer.HasPiece(p.index) {
+                               continue
+                       }
+                       if !peer.pieceAllowedFastOrDefault(p.index) {
+                               // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
+                               peer.nextState.Interested = true
+                               if peer.Choking {
                                        continue
                                }
-                               if !peer.HasPiece(p.index) {
+                       }
+                       peer.addNextRequest(req)
+                       return
+               }
+       })
+chunk:
+       for chunk, prePeer := range preallocated {
+               req := Request{pp.Integer(p.index), chunk}
+               prePeer.requestsInPiece--
+               sortPeersForPiece(&req)
+               delete(prePeer.nextState.Requests, req)
+               for _, peer := range peersForPiece {
+                       if !peer.canFitRequest() {
+                               continue
+                       }
+                       if !peer.HasPiece(p.index) {
+                               continue
+                       }
+                       if !peer.pieceAllowedFastOrDefault(p.index) {
+                               // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
+                               peer.nextState.Interested = true
+                               if peer.Choking {
                                        continue
                                }
-                               if !peer.pieceAllowedFastOrDefault(p.index) {
-                                       // TODO: Verify that's okay to stay uninterested if we request allowed fast
-                                       // pieces.
-                                       peer.nextState.Interested = true
-                                       if peer.Choking {
-                                               continue
-                                       }
-                               }
-                               peer.addNextRequest(req)
-                               return
                        }
-               })
+                       pendingChunksRemaining--
+                       peer.addNextRequest(req)
+                       continue chunk
+               }
        }
        if pendingChunksRemaining != 0 {
                panic(pendingChunksRemaining)
        }
-       for _, peer := range peersForPiece {
-               if peer.canRequestPiece(p.index) {
-                       peer.requestablePiecesRemaining--
-               }
-       }
 }
index 2096140386078b533ffaf2fa2f3c14d163f238e1..213b6f52674e27457ec0e49d2ed48c127e3b4e58 100644 (file)
@@ -5,6 +5,7 @@ import (
        "testing"
 
        pp "github.com/anacrolix/torrent/peer_protocol"
+       "github.com/bradfitz/iter"
        qt "github.com/frankban/quicktest"
 )
 
@@ -12,6 +13,14 @@ func r(i pieceIndex, begin int) Request {
        return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}}
 }
 
+func chunkIterRange(end int) func(func(ChunkSpec)) {
+       return func(f func(ChunkSpec)) {
+               for offset := range iter.N(end) {
+                       f(ChunkSpec{pp.Integer(offset), 1})
+               }
+       }
+}
+
 func chunkIter(offsets ...int) func(func(ChunkSpec)) {
        return func(f func(ChunkSpec)) {
                for _, offset := range offsets {
@@ -59,7 +68,7 @@ func TestStealingFromSlowerPeer(t *testing.T) {
                Pieces: []Piece{{
                        Request:           true,
                        NumPendingChunks:  5,
-                       IterPendingChunks: chunkIter(0, 1, 2, 3, 4),
+                       IterPendingChunks: chunkIterRange(5),
                }},
                Peers: []Peer{
                        stealee,
@@ -170,3 +179,51 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
                Requests:   requestSetFromSlice(keepReq),
        })
 }
+
+func TestDontStealUnnecessarily(t *testing.T) {
+       c := qt.New(t)
+       order := ClientPieceOrder{}
+       basePeer := Peer{
+               HasPiece: func(i pieceIndex) bool {
+                       return true
+               },
+               MaxRequests:  math.MaxInt16,
+               DownloadRate: 2,
+       }
+       // Slower than the stealers, but has all requests already.
+       stealee := basePeer
+       stealee.DownloadRate = 1
+       keepReqs := requestSetFromSlice(r(0, 0), r(0, 1), r(0, 2))
+       stealee.HasExistingRequest = func(r Request) bool {
+               _, ok := keepReqs[r]
+               return ok
+       }
+       stealee.Id = intPeerId(1)
+       firstStealer := basePeer
+       firstStealer.Id = intPeerId(2)
+       secondStealer := basePeer
+       secondStealer.Id = intPeerId(3)
+       results := order.DoRequests([]*Torrent{{
+               Pieces: []Piece{{
+                       Request:           true,
+                       NumPendingChunks:  9,
+                       IterPendingChunks: chunkIterRange(9),
+               }},
+               Peers: []Peer{
+                       firstStealer,
+                       stealee,
+                       secondStealer,
+               },
+       }})
+       c.Assert(results, qt.HasLen, 3)
+       check := func(p PeerId, l int) {
+               c.Check(results[p].Requests, qt.HasLen, l)
+               c.Check(results[p].Interested, qt.Equals, l > 0)
+       }
+       check(firstStealer.Id, 3)
+       check(secondStealer.Id, 3)
+       c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
+               Interested: true,
+               Requests:   keepReqs,
+       })
+}
index ec778a8c4f2f35a2a9dbd38b5cb056e6654e2610..508ed8294fed2743b2d0f22727ce11c15ca5fa30 100644 (file)
@@ -13,3 +13,10 @@ type Piece struct {
        NumPendingChunks  int
        IterPendingChunks func(func(types.ChunkSpec))
 }
+
+func (p *Piece) iterPendingChunksWrapper(f func(ChunkSpec)) {
+       i := p.IterPendingChunks
+       if i != nil {
+               i(f)
+       }
+}