]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add a working request strategy test
authorMatt Joiner <anacrolix@gmail.com>
Thu, 13 May 2021 01:26:22 +0000 (11:26 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:39 +0000 (13:01 +1000)
request-strategy.go
request-strategy/order.go
request-strategy/order_test.go
request-strategy/peer.go

index a1221ad6b1e46b0e0c66bf43004fc74611abff67..cb30d39aa0853fc20b8a551a18767b20fa4e9f38 100644 (file)
@@ -52,9 +52,9 @@ func (cl *Client) doRequests() {
                        if p.closed.IsSet() {
                                return
                        }
-                       rst.Peers = append(rst.Peers, &request_strategy.Peer{
+                       rst.Peers = append(rst.Peers, request_strategy.Peer{
                                HasPiece:    p.peerHasPiece,
-                               MaxRequests: p.nominalMaxRequests,
+                               MaxRequests: p.nominalMaxRequests(),
                                HasExistingRequest: func(r request_strategy.Request) bool {
                                        _, ok := p.requests[r]
                                        return ok
@@ -65,7 +65,7 @@ func (cl *Client) doRequests() {
                                },
                                DownloadRate: p.downloadRate(),
                                Age:          time.Since(p.completedHandshake),
-                               Id:           unsafe.Pointer(p),
+                               Id:           (*peerId)(p),
                        })
                })
                ts = append(ts, rst)
@@ -76,8 +76,14 @@ func (cl *Client) doRequests() {
        }
 }
 
-func applyPeerNextRequestState(_p request_strategy.PeerPointer, rp request_strategy.PeerNextRequestState) {
-       p := (*Peer)(_p)
+type peerId Peer
+
+func (p *peerId) Uintptr() uintptr {
+       return uintptr(unsafe.Pointer(p))
+}
+
+func applyPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
+       p := (*Peer)(_p.(*peerId))
        p.setInterested(rp.Interested)
        for req := range p.requests {
                if _, ok := rp.Requests[req]; !ok {
index 3c7d82abe022136e87b32933a7f8d2b91b66183d..f0a5eaef7632cd9e1325f62d91546947b4614c47 100644 (file)
@@ -6,12 +6,15 @@ import (
        "github.com/anacrolix/multiless"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/types"
+       "github.com/davecgh/go-spew/spew"
 )
 
 type (
        Request       = types.Request
        pieceIndex    = types.PieceIndex
        piecePriority = types.PiecePriority
+       // This can be made into a type-param later, will be great for testing.
+       ChunkSpec = types.ChunkSpec
 )
 
 type ClientPieceOrder struct {
@@ -43,13 +46,13 @@ func (me ClientPieceOrder) less(_i, _j int) bool {
 }
 
 type requestsPeer struct {
-       *Peer
+       Peer
        nextState                  PeerNextRequestState
        requestablePiecesRemaining int
 }
 
 func (rp *requestsPeer) canFitRequest() bool {
-       return len(rp.nextState.Requests) < rp.MaxRequests()
+       return len(rp.nextState.Requests) < rp.MaxRequests
 }
 
 // Returns true if it is added and wasn't there before.
@@ -69,7 +72,6 @@ type peersForPieceRequests struct {
 
 func (me *peersForPieceRequests) addNextRequest(r Request) {
        if me.requestsPeer.addNextRequest(r) {
-               return
                me.requestsInPiece++
        }
 }
@@ -77,10 +79,10 @@ func (me *peersForPieceRequests) addNextRequest(r Request) {
 type Torrent struct {
        Pieces   []Piece
        Capacity *func() *int64
-       Peers    []*Peer // not closed.
+       Peers    []Peer // not closed.
 }
 
-func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPointer]PeerNextRequestState {
+func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
        requestOrder.pieces = requestOrder.pieces[:0]
        allPeers := make(map[*Torrent][]*requestsPeer)
        // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
@@ -153,57 +155,60 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPo
                                        int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
                                        // TODO: Probably peer priority can come next
                                ).Uintptr(
-                                       uintptr(peersForPiece[j].Id),
-                                       uintptr(peersForPiece[i].Id),
+                                       peersForPiece[i].Id.Uintptr(),
+                                       peersForPiece[j].Id.Uintptr(),
                                ).MustLess()
                        })
                }
                pendingChunksRemaining := int(p.NumPendingChunks)
-               torrentPiece.IterPendingChunks(func(chunk types.ChunkSpec) {
-                       req := Request{pp.Integer(p.index), chunk}
-                       pendingChunksRemaining--
-                       sortPeersForPiece()
-                       skipped := 0
-                       // Try up to the number of peers that could legitimately receive the request equal to
-                       // the number of chunks left. This should ensure that only the best peers serve the last
-                       // few chunks in a piece.
-                       for _, peer := range peersForPiece {
-                               if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.PieceAllowedFast(p.index) && peer.Choking) {
-                                       continue
-                               }
-                               if skipped > pendingChunksRemaining {
-                                       break
-                               }
-                               if !peer.HasExistingRequest(req) {
-                                       skipped++
-                                       continue
-                               }
-                               if !peer.PieceAllowedFast(p.index) {
-                                       // We must stay interested for this.
-                                       peer.nextState.Interested = true
-                               }
-                               peer.addNextRequest(req)
-                               return
-                       }
-                       for _, peer := range peersForPiece {
-                               if !peer.canFitRequest() {
-                                       continue
-                               }
-                               if !peer.HasPiece(p.index) {
-                                       continue
+               if f := torrentPiece.IterPendingChunks; f != nil {
+                       f(func(chunk types.ChunkSpec) {
+                               req := Request{pp.Integer(p.index), chunk}
+                               pendingChunksRemaining--
+                               sortPeersForPiece()
+                               spew.Dump(peersForPiece)
+                               skipped := 0
+                               // Try up to the number of peers that could legitimately receive the request equal to
+                               // the number of chunks left. This should ensure that only the best peers serve the last
+                               // few chunks in a piece.
+                               for _, peer := range peersForPiece {
+                                       if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
+                                               continue
+                                       }
+                                       if skipped >= pendingChunksRemaining {
+                                               break
+                                       }
+                                       if f := peer.HasExistingRequest; f == nil || !f(req) {
+                                               skipped++
+                                               continue
+                                       }
+                                       if !peer.pieceAllowedFastOrDefault(p.index) {
+                                               // We must stay interested for this.
+                                               peer.nextState.Interested = true
+                                       }
+                                       peer.addNextRequest(req)
+                                       return
                                }
-                               if !peer.PieceAllowedFast(p.index) {
-                                       // TODO: Verify that's okay to stay uninterested if we request allowed fast
-                                       // pieces.
-                                       peer.nextState.Interested = true
-                                       if peer.Choking {
+                               for _, peer := range peersForPiece {
+                                       if !peer.canFitRequest() {
+                                               continue
+                                       }
+                                       if !peer.HasPiece(p.index) {
                                                continue
                                        }
+                                       if !peer.pieceAllowedFastOrDefault(p.index) {
+                                               // TODO: Verify that's okay to stay uninterested if we request allowed fast
+                                               // pieces.
+                                               peer.nextState.Interested = true
+                                               if peer.Choking {
+                                                       continue
+                                               }
+                                       }
+                                       peer.addNextRequest(req)
+                                       return
                                }
-                               peer.addNextRequest(req)
-                               return
-                       }
-               })
+                       })
+               }
                if pendingChunksRemaining != 0 {
                        panic(pendingChunksRemaining)
                }
@@ -213,7 +218,7 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPo
                        }
                }
        }
-       ret := make(map[PeerPointer]PeerNextRequestState)
+       ret := make(map[PeerId]PeerNextRequestState)
        for _, peers := range allPeers {
                for _, rp := range peers {
                        if rp.requestablePiecesRemaining != 0 {
index e014624e5504b1c774e51fc63fce91f897aee751..c972b301f3de022f7851bef2d08ac640a4b01a28 100644 (file)
@@ -1 +1,82 @@
 package request_strategy
+
+import (
+       "math"
+       "testing"
+
+       pp "github.com/anacrolix/torrent/peer_protocol"
+       qt "github.com/frankban/quicktest"
+)
+
+func r(i pieceIndex, begin int) Request {
+       return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}}
+}
+
+func chunkIter(offsets ...int) func(func(ChunkSpec)) {
+       return func(f func(ChunkSpec)) {
+               for _, offset := range offsets {
+                       f(ChunkSpec{pp.Integer(offset), 1})
+               }
+       }
+}
+
+func requestSetFromSlice(rs ...Request) (ret map[Request]struct{}) {
+       ret = make(map[Request]struct{}, len(rs))
+       for _, r := range rs {
+               ret[r] = struct{}{}
+       }
+       return
+}
+
+type intPeerId int
+
+func (i intPeerId) Uintptr() uintptr {
+       return uintptr(i)
+}
+
+func TestStealingFromSlowerPeers(t *testing.T) {
+       c := qt.New(t)
+       order := ClientPieceOrder{}
+       basePeer := Peer{
+               HasPiece: func(i pieceIndex) bool {
+                       return true
+               },
+               MaxRequests:  math.MaxInt16,
+               DownloadRate: 2,
+       }
+       stealee := basePeer
+       stealee.DownloadRate = 1
+       stealee.HasExistingRequest = func(r Request) bool {
+               return true
+       }
+       stealee.Id = intPeerId(1)
+       firstStealer := basePeer
+       firstStealer.Id = intPeerId(2)
+       secondStealer := basePeer
+       secondStealer.Id = intPeerId(3)
+       c.Assert(order.DoRequests([]*Torrent{{
+               Pieces: []Piece{{
+                       Request:           true,
+                       NumPendingChunks:  2,
+                       IterPendingChunks: chunkIter(0, 1),
+               }},
+               Peers: []Peer{
+                       stealee,
+                       firstStealer,
+                       secondStealer,
+               },
+       }}), qt.ContentEquals, map[PeerId]PeerNextRequestState{
+               intPeerId(2): {
+                       Interested: true,
+                       Requests:   requestSetFromSlice(r(0, 0)),
+               },
+               intPeerId(3): {
+                       Interested: true,
+                       Requests:   requestSetFromSlice(r(0, 1)),
+               },
+               stealee.Id: {
+                       Interested: false,
+                       Requests:   requestSetFromSlice(),
+               },
+       })
+}
index 4a3d0689a4371bce9045b3ecdffd78aa97872f28..21ef0d2eca92e0a54ea0271b3aa2776bf1555efd 100644 (file)
@@ -2,7 +2,6 @@ package request_strategy
 
 import (
        "time"
-       "unsafe"
 )
 
 type PeerNextRequestState struct {
@@ -10,20 +9,30 @@ type PeerNextRequestState struct {
        Requests   map[Request]struct{}
 }
 
-type PeerPointer = unsafe.Pointer
+type PeerId interface {
+       Uintptr() uintptr
+}
 
 type Peer struct {
-       HasPiece           func(pieceIndex) bool
-       MaxRequests        func() int
-       HasExistingRequest func(Request) bool
+       HasPiece           func(pieceIndex) bool
+       MaxRequests        int
+       HasExistingRequest func(Request) bool
        Choking            bool
        PieceAllowedFast   func(pieceIndex) bool
        DownloadRate       float64
        Age                time.Duration
-       Id                 PeerPointer
+       // This is passed back out at the end, so must support equality. Could be a type-param later.
+       Id PeerId
+}
+
+func (p *Peer) pieceAllowedFastOrDefault(i pieceIndex) bool {
+       if f := p.PieceAllowedFast; f != nil {
+               return f(i)
+       }
+       return false
 }
 
 // TODO: This might be used in more places I think.
 func (p *Peer) canRequestPiece(i pieceIndex) bool {
-       return p.HasPiece(i) && (!p.Choking || p.PieceAllowedFast(i))
+       return p.HasPiece(i) && (!p.Choking || (p.PieceAllowedFast != nil && p.PieceAllowedFast(i)))
 }