From 26837ee73c872a1a8131f778c780c3a0f84d1bc7 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 13 May 2021 11:26:22 +1000 Subject: [PATCH] Add a working request strategy test --- request-strategy.go | 16 +++-- request-strategy/order.go | 103 +++++++++++++++++---------------- request-strategy/order_test.go | 81 ++++++++++++++++++++++++++ request-strategy/peer.go | 23 +++++--- 4 files changed, 162 insertions(+), 61 deletions(-) diff --git a/request-strategy.go b/request-strategy.go index a1221ad6..cb30d39a 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -52,9 +52,9 @@ func (cl *Client) doRequests() { if p.closed.IsSet() { return } - rst.Peers = append(rst.Peers, &request_strategy.Peer{ + rst.Peers = append(rst.Peers, request_strategy.Peer{ HasPiece: p.peerHasPiece, - MaxRequests: p.nominalMaxRequests, + MaxRequests: p.nominalMaxRequests(), HasExistingRequest: func(r request_strategy.Request) bool { _, ok := p.requests[r] return ok @@ -65,7 +65,7 @@ func (cl *Client) doRequests() { }, DownloadRate: p.downloadRate(), Age: time.Since(p.completedHandshake), - Id: unsafe.Pointer(p), + Id: (*peerId)(p), }) }) ts = append(ts, rst) @@ -76,8 +76,14 @@ func (cl *Client) doRequests() { } } -func applyPeerNextRequestState(_p request_strategy.PeerPointer, rp request_strategy.PeerNextRequestState) { - p := (*Peer)(_p) +type peerId Peer + +func (p *peerId) Uintptr() uintptr { + return uintptr(unsafe.Pointer(p)) +} + +func applyPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) { + p := (*Peer)(_p.(*peerId)) p.setInterested(rp.Interested) for req := range p.requests { if _, ok := rp.Requests[req]; !ok { diff --git a/request-strategy/order.go b/request-strategy/order.go index 3c7d82ab..f0a5eaef 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -6,12 +6,15 @@ import ( "github.com/anacrolix/multiless" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/types" + "github.com/davecgh/go-spew/spew" ) type ( Request = types.Request pieceIndex = types.PieceIndex piecePriority = types.PiecePriority + // This can be made into a type-param later, will be great for testing. + ChunkSpec = types.ChunkSpec ) type ClientPieceOrder struct { @@ -43,13 +46,13 @@ func (me ClientPieceOrder) less(_i, _j int) bool { } type requestsPeer struct { - *Peer + Peer nextState PeerNextRequestState requestablePiecesRemaining int } func (rp *requestsPeer) canFitRequest() bool { - return len(rp.nextState.Requests) < rp.MaxRequests() + return len(rp.nextState.Requests) < rp.MaxRequests } // Returns true if it is added and wasn't there before. @@ -69,7 +72,6 @@ type peersForPieceRequests struct { func (me *peersForPieceRequests) addNextRequest(r Request) { if me.requestsPeer.addNextRequest(r) { - return me.requestsInPiece++ } } @@ -77,10 +79,10 @@ func (me *peersForPieceRequests) addNextRequest(r Request) { type Torrent struct { Pieces []Piece Capacity *func() *int64 - Peers []*Peer // not closed. + Peers []Peer // not closed. } -func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPointer]PeerNextRequestState { +func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState { requestOrder.pieces = requestOrder.pieces[:0] allPeers := make(map[*Torrent][]*requestsPeer) // Storage capacity left for this run, keyed by the storage capacity pointer on the storage @@ -153,57 +155,60 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPo int64(peersForPiece[j].Age), int64(peersForPiece[i].Age), // TODO: Probably peer priority can come next ).Uintptr( - uintptr(peersForPiece[j].Id), - uintptr(peersForPiece[i].Id), + peersForPiece[i].Id.Uintptr(), + peersForPiece[j].Id.Uintptr(), ).MustLess() }) } pendingChunksRemaining := int(p.NumPendingChunks) - torrentPiece.IterPendingChunks(func(chunk types.ChunkSpec) { - req := Request{pp.Integer(p.index), chunk} - pendingChunksRemaining-- - sortPeersForPiece() - skipped := 0 - // Try up to the number of peers that could legitimately receive the request equal to - // the number of chunks left. This should ensure that only the best peers serve the last - // few chunks in a piece. - for _, peer := range peersForPiece { - if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.PieceAllowedFast(p.index) && peer.Choking) { - continue - } - if skipped > pendingChunksRemaining { - break - } - if !peer.HasExistingRequest(req) { - skipped++ - continue - } - if !peer.PieceAllowedFast(p.index) { - // We must stay interested for this. - peer.nextState.Interested = true - } - peer.addNextRequest(req) - return - } - for _, peer := range peersForPiece { - if !peer.canFitRequest() { - continue - } - if !peer.HasPiece(p.index) { - continue + if f := torrentPiece.IterPendingChunks; f != nil { + f(func(chunk types.ChunkSpec) { + req := Request{pp.Integer(p.index), chunk} + pendingChunksRemaining-- + sortPeersForPiece() + spew.Dump(peersForPiece) + skipped := 0 + // Try up to the number of peers that could legitimately receive the request equal to + // the number of chunks left. This should ensure that only the best peers serve the last + // few chunks in a piece. + for _, peer := range peersForPiece { + if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) { + continue + } + if skipped >= pendingChunksRemaining { + break + } + if f := peer.HasExistingRequest; f == nil || !f(req) { + skipped++ + continue + } + if !peer.pieceAllowedFastOrDefault(p.index) { + // We must stay interested for this. + peer.nextState.Interested = true + } + peer.addNextRequest(req) + return } - if !peer.PieceAllowedFast(p.index) { - // TODO: Verify that's okay to stay uninterested if we request allowed fast - // pieces. - peer.nextState.Interested = true - if peer.Choking { + for _, peer := range peersForPiece { + if !peer.canFitRequest() { + continue + } + if !peer.HasPiece(p.index) { continue } + if !peer.pieceAllowedFastOrDefault(p.index) { + // TODO: Verify that's okay to stay uninterested if we request allowed fast + // pieces. + peer.nextState.Interested = true + if peer.Choking { + continue + } + } + peer.addNextRequest(req) + return } - peer.addNextRequest(req) - return - } - }) + }) + } if pendingChunksRemaining != 0 { panic(pendingChunksRemaining) } @@ -213,7 +218,7 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPo } } } - ret := make(map[PeerPointer]PeerNextRequestState) + ret := make(map[PeerId]PeerNextRequestState) for _, peers := range allPeers { for _, rp := range peers { if rp.requestablePiecesRemaining != 0 { diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go index e014624e..c972b301 100644 --- a/request-strategy/order_test.go +++ b/request-strategy/order_test.go @@ -1 +1,82 @@ package request_strategy + +import ( + "math" + "testing" + + pp "github.com/anacrolix/torrent/peer_protocol" + qt "github.com/frankban/quicktest" +) + +func r(i pieceIndex, begin int) Request { + return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}} +} + +func chunkIter(offsets ...int) func(func(ChunkSpec)) { + return func(f func(ChunkSpec)) { + for _, offset := range offsets { + f(ChunkSpec{pp.Integer(offset), 1}) + } + } +} + +func requestSetFromSlice(rs ...Request) (ret map[Request]struct{}) { + ret = make(map[Request]struct{}, len(rs)) + for _, r := range rs { + ret[r] = struct{}{} + } + return +} + +type intPeerId int + +func (i intPeerId) Uintptr() uintptr { + return uintptr(i) +} + +func TestStealingFromSlowerPeers(t *testing.T) { + c := qt.New(t) + order := ClientPieceOrder{} + basePeer := Peer{ + HasPiece: func(i pieceIndex) bool { + return true + }, + MaxRequests: math.MaxInt16, + DownloadRate: 2, + } + stealee := basePeer + stealee.DownloadRate = 1 + stealee.HasExistingRequest = func(r Request) bool { + return true + } + stealee.Id = intPeerId(1) + firstStealer := basePeer + firstStealer.Id = intPeerId(2) + secondStealer := basePeer + secondStealer.Id = intPeerId(3) + c.Assert(order.DoRequests([]*Torrent{{ + Pieces: []Piece{{ + Request: true, + NumPendingChunks: 2, + IterPendingChunks: chunkIter(0, 1), + }}, + Peers: []Peer{ + stealee, + firstStealer, + secondStealer, + }, + }}), qt.ContentEquals, map[PeerId]PeerNextRequestState{ + intPeerId(2): { + Interested: true, + Requests: requestSetFromSlice(r(0, 0)), + }, + intPeerId(3): { + Interested: true, + Requests: requestSetFromSlice(r(0, 1)), + }, + stealee.Id: { + Interested: false, + Requests: requestSetFromSlice(), + }, + }) +} diff --git a/request-strategy/peer.go b/request-strategy/peer.go index 4a3d0689..21ef0d2e 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -2,7 +2,6 @@ package request_strategy import ( "time" - "unsafe" ) type PeerNextRequestState struct { @@ -10,20 +9,30 @@ type PeerNextRequestState struct { Requests map[Request]struct{} } -type PeerPointer = unsafe.Pointer +type PeerId interface { + Uintptr() uintptr +} type Peer struct { - HasPiece func(pieceIndex) bool - MaxRequests func() int - HasExistingRequest func(Request) bool + HasPiece func(i pieceIndex) bool + MaxRequests int + HasExistingRequest func(r Request) bool Choking bool PieceAllowedFast func(pieceIndex) bool DownloadRate float64 Age time.Duration - Id PeerPointer + // This is passed back out at the end, so must support equality. Could be a type-param later. + Id PeerId +} + +func (p *Peer) pieceAllowedFastOrDefault(i pieceIndex) bool { + if f := p.PieceAllowedFast; f != nil { + return f(i) + } + return false } // TODO: This might be used in more places I think. func (p *Peer) canRequestPiece(i pieceIndex) bool { - return p.HasPiece(i) && (!p.Choking || p.PieceAllowedFast(i)) + return p.HasPiece(i) && (!p.Choking || (p.PieceAllowedFast != nil && p.PieceAllowedFast(i))) } -- 2.48.1