From f50b8fc2fa6d6069ae461860af5a0f209f05b3c9 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 21 May 2021 14:02:45 +1000 Subject: [PATCH] Fix issue with duplicate preallocated requests Likely introduced by allowing actual and next request states to differ. --- request-strategy/order.go | 43 ++++++++++++++++++++++++++------- request-strategy/order_test.go | 44 +++++++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/request-strategy/order.go b/request-strategy/order.go index 2ac04ab4..2a364eb8 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -1,9 +1,11 @@ package request_strategy import ( + "fmt" "sort" "github.com/anacrolix/multiless" + pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/types" ) @@ -160,7 +162,7 @@ func Run(input Input) map[PeerId]PeerNextRequestState { peers = append(peers, &requestsPeer{ Peer: p, nextState: PeerNextRequestState{ - Requests: make(map[Request]struct{}), + Requests: make(map[Request]struct{}, p.MaxRequests), }, }) } @@ -182,12 +184,29 @@ func Run(input Input) map[PeerId]PeerNextRequestState { if rp.requestablePiecesRemaining != 0 { panic(rp.requestablePiecesRemaining) } + if _, ok := ret[rp.Id]; ok { + panic(fmt.Sprintf("duplicate peer id: %v", rp.Id)) + } ret[rp.Id] = rp.nextState } } return ret } +// Checks that a sorted peersForPiece slice makes sense. +func ensureValidSortedPeersForPieceRequests(peers []*peersForPieceRequests, sortLess func(_, _ int) bool) { + if !sort.SliceIsSorted(peers, sortLess) { + panic("not sorted") + } + peerMap := make(map[*peersForPieceRequests]struct{}, len(peers)) + for _, p := range peers { + if _, ok := peerMap[p]; ok { + panic(p) + } + peerMap[p] = struct{}{} + } +} + func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { peersForPiece := make([]*peersForPieceRequests, 0, len(peers)) for _, peer := range peers { @@ -204,7 +223,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { } }() sortPeersForPiece := func(req *Request) { - sort.Slice(peersForPiece, func(i, j int) bool { + less := func(i, j int) bool { byHasRequest := func() multiless.Computation { ml := multiless.New() if req != nil { @@ -246,9 +265,13 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { peersForPiece[i].Id.Uintptr(), peersForPiece[j].Id.Uintptr(), ).MustLess() - }) + } + sort.Slice(peersForPiece, less) + ensureValidSortedPeersForPieceRequests(peersForPiece, less) } - preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks) + // Chunks can be preassigned several times, if peers haven't been able to update their "actual" + // with "next" request state before another request strategy run occurs. + preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks) p.IterPendingChunks(func(spec ChunkSpec) { req := Request{pp.Integer(p.index), spec} for _, peer := range peersForPiece { @@ -261,7 +284,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { if !peer.canRequestPiece(p.index) { continue } - preallocated[spec] = peer + preallocated[spec] = append(preallocated[spec], peer) peer.addNextRequest(req) } }) @@ -292,12 +315,16 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { } }) chunk: - for chunk, prePeer := range preallocated { + for chunk, prePeers := range preallocated { pendingChunksRemaining-- req := Request{pp.Integer(p.index), chunk} - prePeer.requestsInPiece-- + for _, pp := range prePeers { + pp.requestsInPiece-- + } sortPeersForPiece(&req) - delete(prePeer.nextState.Requests, req) + for _, pp := range prePeers { + delete(pp.nextState.Requests, req) + } for _, peer := range peersForPiece { if !peer.canFitRequest() { continue diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go index a448ac37..d37b0423 100644 --- a/request-strategy/order_test.go +++ b/request-strategy/order_test.go @@ -4,9 +4,10 @@ import ( "math" "testing" - pp "github.com/anacrolix/torrent/peer_protocol" "github.com/bradfitz/iter" qt "github.com/frankban/quicktest" + + pp "github.com/anacrolix/torrent/peer_protocol" ) func r(i pieceIndex, begin int) Request { @@ -253,3 +254,44 @@ func TestDontStealUnnecessarily(t *testing.T) { Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)), }) } + +// This tests a situation where multiple peers had the same existing request, due to "actual" and +// "next" request states being out of sync. This reasonable occurs when a peer hasn't fully updated +// its actual request state since the last request strategy run. +func TestDuplicatePreallocations(t *testing.T) { + peer := func(id int, downloadRate float64) Peer { + return Peer{ + HasExistingRequest: func(r Request) bool { + return true + }, + MaxRequests: 2, + HasPiece: func(i pieceIndex) bool { + return true + }, + Id: intPeerId(id), + DownloadRate: downloadRate, + } + } + results := Run(Input{ + Torrents: []Torrent{{ + Pieces: []Piece{{ + Request: true, + NumPendingChunks: 1, + IterPendingChunks: chunkIterRange(1), + }, { + Request: true, + NumPendingChunks: 1, + IterPendingChunks: chunkIterRange(1), + }}, + Peers: []Peer{ + // The second peer was be marked as the preallocation, clobbering the first. The + // first peer is preferred, and the piece isn't striped, so it gets preallocated a + // request, and then gets reallocated from the peer the same request. + peer(1, 2), + peer(2, 1), + }, + }}, + }) + c := qt.New(t) + c.Assert(2, qt.Equals, len(results[intPeerId(1)].Requests)+len(results[intPeerId(2)].Requests)) +} -- 2.44.0