From b80e2b08b5b846882798cd4981f0e9b28a17fb40 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 13 May 2021 20:56:12 +1000 Subject: [PATCH] Get request rebalancing working optimally! --- request-strategy/order.go | 134 ++++++++++++++++++--------------- request-strategy/order_test.go | 59 ++++++++++++++- request-strategy/piece.go | 7 ++ 3 files changed, 138 insertions(+), 62 deletions(-) diff --git a/request-strategy/order.go b/request-strategy/order.go index da6890e5..c16cb3ea 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -1,7 +1,6 @@ package request_strategy import ( - "math" "sort" "github.com/anacrolix/multiless" @@ -55,14 +54,12 @@ func (rp *requestsPeer) canFitRequest() bool { return len(rp.nextState.Requests) < rp.MaxRequests } -// Returns true if it is added and wasn't there before. -func (rp *requestsPeer) addNextRequest(r Request) bool { +func (rp *requestsPeer) addNextRequest(r Request) { _, ok := rp.nextState.Requests[r] if ok { - return false + panic("should only add once") } rp.nextState.Requests[r] = struct{}{} - return true } type peersForPieceRequests struct { @@ -71,9 +68,8 @@ type peersForPieceRequests struct { } func (me *peersForPieceRequests) addNextRequest(r Request) { - if me.requestsPeer.addNextRequest(r) { - me.requestsInPiece++ - } + me.requestsPeer.addNextRequest(r) + me.requestsInPiece++ } type Torrent struct { @@ -155,9 +151,16 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) { requestsPeer: peer, }) } - sortPeersForPiece := func() { + defer func() { + for _, peer := range peersForPiece { + if peer.canRequestPiece(p.index) { + peer.requestablePiecesRemaining-- + } + } + }() + sortPeersForPiece := func(byHasRequest *Request) { sort.Slice(peersForPiece, func(i, j int) bool { - return multiless.New().Int( + ml := multiless.New().Int( peersForPiece[i].requestsInPiece, peersForPiece[j].requestsInPiece, ).Int( @@ -166,7 +169,13 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) { ).Float64( peersForPiece[j].DownloadRate, peersForPiece[i].DownloadRate, - ).Int64( + ) + if byHasRequest != nil { + _, iHas := peersForPiece[i].nextState.Requests[*byHasRequest] + _, jHas := peersForPiece[j].nextState.Requests[*byHasRequest] + ml = ml.Bool(jHas, iHas) + } + return ml.Int64( int64(peersForPiece[j].Age), int64(peersForPiece[i].Age), // TODO: Probably peer priority can come next ).Uintptr( @@ -175,65 +184,68 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) { ).MustLess() }) } + preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks) + p.iterPendingChunksWrapper(func(spec ChunkSpec) { + req := Request{pp.Integer(p.index), spec} + for _, p := range peersForPiece { + if h := p.HasExistingRequest; h != nil && h(req) { + preallocated[spec] = p + p.addNextRequest(req) + } + } + }) pendingChunksRemaining := int(p.NumPendingChunks) - if f := p.IterPendingChunks; f != nil { - f(func(chunk types.ChunkSpec) { - req := Request{pp.Integer(p.index), chunk} - defer func() { pendingChunksRemaining-- }() - sortPeersForPiece() - skipped := 0 - // Try up to the number of peers that could legitimately receive the request equal to - // the number of chunks left. This should ensure that only the best peers serve the last - // few chunks in a piece. - lowestNumRequestsInPiece := math.MaxInt16 - for _, peer := range peersForPiece { - if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) { - continue - } - if skipped+1 >= pendingChunksRemaining { - break - } - if f := peer.HasExistingRequest; f == nil || !f(req) { - skipped++ - lowestNumRequestsInPiece = peer.requestsInPiece - continue - } - if peer.requestsInPiece > lowestNumRequestsInPiece { - break - } - if !peer.pieceAllowedFastOrDefault(p.index) { - // We must stay interested for this. - peer.nextState.Interested = true - } - peer.addNextRequest(req) - return + p.iterPendingChunksWrapper(func(chunk types.ChunkSpec) { + if _, ok := preallocated[chunk]; ok { + return + } + req := Request{pp.Integer(p.index), chunk} + defer func() { pendingChunksRemaining-- }() + sortPeersForPiece(nil) + for _, peer := range peersForPiece { + if !peer.canFitRequest() { + continue } - for _, peer := range peersForPiece { - if !peer.canFitRequest() { + if !peer.HasPiece(p.index) { + continue + } + if !peer.pieceAllowedFastOrDefault(p.index) { + // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces. + peer.nextState.Interested = true + if peer.Choking { continue } - if !peer.HasPiece(p.index) { + } + peer.addNextRequest(req) + return + } + }) +chunk: + for chunk, prePeer := range preallocated { + req := Request{pp.Integer(p.index), chunk} + prePeer.requestsInPiece-- + sortPeersForPiece(&req) + delete(prePeer.nextState.Requests, req) + for _, peer := range peersForPiece { + if !peer.canFitRequest() { + continue + } + if !peer.HasPiece(p.index) { + continue + } + if !peer.pieceAllowedFastOrDefault(p.index) { + // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces. + peer.nextState.Interested = true + if peer.Choking { continue } - if !peer.pieceAllowedFastOrDefault(p.index) { - // TODO: Verify that's okay to stay uninterested if we request allowed fast - // pieces. - peer.nextState.Interested = true - if peer.Choking { - continue - } - } - peer.addNextRequest(req) - return } - }) + pendingChunksRemaining-- + peer.addNextRequest(req) + continue chunk + } } if pendingChunksRemaining != 0 { panic(pendingChunksRemaining) } - for _, peer := range peersForPiece { - if peer.canRequestPiece(p.index) { - peer.requestablePiecesRemaining-- - } - } } diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go index 20961403..213b6f52 100644 --- a/request-strategy/order_test.go +++ b/request-strategy/order_test.go @@ -5,6 +5,7 @@ import ( "testing" pp "github.com/anacrolix/torrent/peer_protocol" + "github.com/bradfitz/iter" qt "github.com/frankban/quicktest" ) @@ -12,6 +13,14 @@ func r(i pieceIndex, begin int) Request { return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}} } +func chunkIterRange(end int) func(func(ChunkSpec)) { + return func(f func(ChunkSpec)) { + for offset := range iter.N(end) { + f(ChunkSpec{pp.Integer(offset), 1}) + } + } +} + func chunkIter(offsets ...int) func(func(ChunkSpec)) { return func(f func(ChunkSpec)) { for _, offset := range offsets { @@ -59,7 +68,7 @@ func TestStealingFromSlowerPeer(t *testing.T) { Pieces: []Piece{{ Request: true, NumPendingChunks: 5, - IterPendingChunks: chunkIter(0, 1, 2, 3, 4), + IterPendingChunks: chunkIterRange(5), }}, Peers: []Peer{ stealee, @@ -170,3 +179,51 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) { Requests: requestSetFromSlice(keepReq), }) } + +func TestDontStealUnnecessarily(t *testing.T) { + c := qt.New(t) + order := ClientPieceOrder{} + basePeer := Peer{ + HasPiece: func(i pieceIndex) bool { + return true + }, + MaxRequests: math.MaxInt16, + DownloadRate: 2, + } + // Slower than the stealers, but has all requests already. + stealee := basePeer + stealee.DownloadRate = 1 + keepReqs := requestSetFromSlice(r(0, 0), r(0, 1), r(0, 2)) + stealee.HasExistingRequest = func(r Request) bool { + _, ok := keepReqs[r] + return ok + } + stealee.Id = intPeerId(1) + firstStealer := basePeer + firstStealer.Id = intPeerId(2) + secondStealer := basePeer + secondStealer.Id = intPeerId(3) + results := order.DoRequests([]*Torrent{{ + Pieces: []Piece{{ + Request: true, + NumPendingChunks: 9, + IterPendingChunks: chunkIterRange(9), + }}, + Peers: []Peer{ + firstStealer, + stealee, + secondStealer, + }, + }}) + c.Assert(results, qt.HasLen, 3) + check := func(p PeerId, l int) { + c.Check(results[p].Requests, qt.HasLen, l) + c.Check(results[p].Interested, qt.Equals, l > 0) + } + check(firstStealer.Id, 3) + check(secondStealer.Id, 3) + c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{ + Interested: true, + Requests: keepReqs, + }) +} diff --git a/request-strategy/piece.go b/request-strategy/piece.go index ec778a8c..508ed829 100644 --- a/request-strategy/piece.go +++ b/request-strategy/piece.go @@ -13,3 +13,10 @@ type Piece struct { NumPendingChunks int IterPendingChunks func(func(types.ChunkSpec)) } + +func (p *Piece) iterPendingChunksWrapper(f func(ChunkSpec)) { + i := p.IterPendingChunks + if i != nil { + i(f) + } +} -- 2.48.1