]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix issue with duplicate preallocated requests
authorMatt Joiner <anacrolix@gmail.com>
Fri, 21 May 2021 04:02:45 +0000 (14:02 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:40 +0000 (13:01 +1000)
Likely introduced by allowing actual and next request states to differ.

request-strategy/order.go
request-strategy/order_test.go

index 2ac04ab4a0bbcf32359914dc23940f0d4735e8a7..2a364eb85cc733caa1f9e40bf511b59dc371a70d 100644 (file)
@@ -1,9 +1,11 @@
 package request_strategy
 
 import (
+       "fmt"
        "sort"
 
        "github.com/anacrolix/multiless"
+
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/types"
 )
@@ -160,7 +162,7 @@ func Run(input Input) map[PeerId]PeerNextRequestState {
                        peers = append(peers, &requestsPeer{
                                Peer: p,
                                nextState: PeerNextRequestState{
-                                       Requests: make(map[Request]struct{}),
+                                       Requests: make(map[Request]struct{}, p.MaxRequests),
                                },
                        })
                }
@@ -182,12 +184,29 @@ func Run(input Input) map[PeerId]PeerNextRequestState {
                        if rp.requestablePiecesRemaining != 0 {
                                panic(rp.requestablePiecesRemaining)
                        }
+                       if _, ok := ret[rp.Id]; ok {
+                               panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
+                       }
                        ret[rp.Id] = rp.nextState
                }
        }
        return ret
 }
 
+// Checks that a sorted peersForPiece slice makes sense.
+func ensureValidSortedPeersForPieceRequests(peers []*peersForPieceRequests, sortLess func(_, _ int) bool) {
+       if !sort.SliceIsSorted(peers, sortLess) {
+               panic("not sorted")
+       }
+       peerMap := make(map[*peersForPieceRequests]struct{}, len(peers))
+       for _, p := range peers {
+               if _, ok := peerMap[p]; ok {
+                       panic(p)
+               }
+               peerMap[p] = struct{}{}
+       }
+}
+
 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
        peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
        for _, peer := range peers {
@@ -204,7 +223,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
                }
        }()
        sortPeersForPiece := func(req *Request) {
-               sort.Slice(peersForPiece, func(i, j int) bool {
+               less := func(i, j int) bool {
                        byHasRequest := func() multiless.Computation {
                                ml := multiless.New()
                                if req != nil {
@@ -246,9 +265,13 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
                                peersForPiece[i].Id.Uintptr(),
                                peersForPiece[j].Id.Uintptr(),
                        ).MustLess()
-               })
+               }
+               sort.Slice(peersForPiece, less)
+               ensureValidSortedPeersForPieceRequests(peersForPiece, less)
        }
-       preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
+       // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
+       // with "next" request state before another request strategy run occurs.
+       preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
        p.IterPendingChunks(func(spec ChunkSpec) {
                req := Request{pp.Integer(p.index), spec}
                for _, peer := range peersForPiece {
@@ -261,7 +284,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
                        if !peer.canRequestPiece(p.index) {
                                continue
                        }
-                       preallocated[spec] = peer
+                       preallocated[spec] = append(preallocated[spec], peer)
                        peer.addNextRequest(req)
                }
        })
@@ -292,12 +315,16 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
                }
        })
 chunk:
-       for chunk, prePeer := range preallocated {
+       for chunk, prePeers := range preallocated {
                pendingChunksRemaining--
                req := Request{pp.Integer(p.index), chunk}
-               prePeer.requestsInPiece--
+               for _, pp := range prePeers {
+                       pp.requestsInPiece--
+               }
                sortPeersForPiece(&req)
-               delete(prePeer.nextState.Requests, req)
+               for _, pp := range prePeers {
+                       delete(pp.nextState.Requests, req)
+               }
                for _, peer := range peersForPiece {
                        if !peer.canFitRequest() {
                                continue
index a448ac3791ac6d9a3feccb6ec70e7e7a61fb8cdc..d37b0423ec0b5192dd93f447d1c5b3783102759b 100644 (file)
@@ -4,9 +4,10 @@ import (
        "math"
        "testing"
 
-       pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/bradfitz/iter"
        qt "github.com/frankban/quicktest"
+
+       pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
 func r(i pieceIndex, begin int) Request {
@@ -253,3 +254,44 @@ func TestDontStealUnnecessarily(t *testing.T) {
                Requests:   requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
        })
 }
+
+// This tests a situation where multiple peers had the same existing request, due to "actual" and
+// "next" request states being out of sync. This reasonable occurs when a peer hasn't fully updated
+// its actual request state since the last request strategy run.
+func TestDuplicatePreallocations(t *testing.T) {
+       peer := func(id int, downloadRate float64) Peer {
+               return Peer{
+                       HasExistingRequest: func(r Request) bool {
+                               return true
+                       },
+                       MaxRequests: 2,
+                       HasPiece: func(i pieceIndex) bool {
+                               return true
+                       },
+                       Id:           intPeerId(id),
+                       DownloadRate: downloadRate,
+               }
+       }
+       results := Run(Input{
+               Torrents: []Torrent{{
+                       Pieces: []Piece{{
+                               Request:           true,
+                               NumPendingChunks:  1,
+                               IterPendingChunks: chunkIterRange(1),
+                       }, {
+                               Request:           true,
+                               NumPendingChunks:  1,
+                               IterPendingChunks: chunkIterRange(1),
+                       }},
+                       Peers: []Peer{
+                               // The second peer was be marked as the preallocation, clobbering the first. The
+                               // first peer is preferred, and the piece isn't striped, so it gets preallocated a
+                               // request, and then gets reallocated from the peer the same request.
+                               peer(1, 2),
+                               peer(2, 1),
+                       },
+               }},
+       })
+       c := qt.New(t)
+       c.Assert(2, qt.Equals, len(results[intPeerId(1)].Requests)+len(results[intPeerId(2)].Requests))
+}