if p.closed.IsSet() {
return
}
- rst.Peers = append(rst.Peers, &request_strategy.Peer{
+ rst.Peers = append(rst.Peers, request_strategy.Peer{
HasPiece: p.peerHasPiece,
- MaxRequests: p.nominalMaxRequests,
+ MaxRequests: p.nominalMaxRequests(),
HasExistingRequest: func(r request_strategy.Request) bool {
_, ok := p.requests[r]
return ok
},
DownloadRate: p.downloadRate(),
Age: time.Since(p.completedHandshake),
- Id: unsafe.Pointer(p),
+ Id: (*peerId)(p),
})
})
ts = append(ts, rst)
}
}
-func applyPeerNextRequestState(_p request_strategy.PeerPointer, rp request_strategy.PeerNextRequestState) {
- p := (*Peer)(_p)
+type peerId Peer
+
+func (p *peerId) Uintptr() uintptr {
+ return uintptr(unsafe.Pointer(p))
+}
+
+func applyPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
+ p := (*Peer)(_p.(*peerId))
p.setInterested(rp.Interested)
for req := range p.requests {
if _, ok := rp.Requests[req]; !ok {
"github.com/anacrolix/multiless"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/types"
+ "github.com/davecgh/go-spew/spew"
)
type (
Request = types.Request
pieceIndex = types.PieceIndex
piecePriority = types.PiecePriority
+ // This can be made into a type-param later, will be great for testing.
+ ChunkSpec = types.ChunkSpec
)
type ClientPieceOrder struct {
}
type requestsPeer struct {
- *Peer
+ Peer
nextState PeerNextRequestState
requestablePiecesRemaining int
}
func (rp *requestsPeer) canFitRequest() bool {
- return len(rp.nextState.Requests) < rp.MaxRequests()
+ return len(rp.nextState.Requests) < rp.MaxRequests
}
// Returns true if it is added and wasn't there before.
func (me *peersForPieceRequests) addNextRequest(r Request) {
if me.requestsPeer.addNextRequest(r) {
- return
me.requestsInPiece++
}
}
type Torrent struct {
Pieces []Piece
Capacity *func() *int64
- Peers []*Peer // not closed.
+ Peers []Peer // not closed.
}
-func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPointer]PeerNextRequestState {
+func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
requestOrder.pieces = requestOrder.pieces[:0]
allPeers := make(map[*Torrent][]*requestsPeer)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
// TODO: Probably peer priority can come next
).Uintptr(
- uintptr(peersForPiece[j].Id),
- uintptr(peersForPiece[i].Id),
+ peersForPiece[i].Id.Uintptr(),
+ peersForPiece[j].Id.Uintptr(),
).MustLess()
})
}
pendingChunksRemaining := int(p.NumPendingChunks)
- torrentPiece.IterPendingChunks(func(chunk types.ChunkSpec) {
- req := Request{pp.Integer(p.index), chunk}
- pendingChunksRemaining--
- sortPeersForPiece()
- skipped := 0
- // Try up to the number of peers that could legitimately receive the request equal to
- // the number of chunks left. This should ensure that only the best peers serve the last
- // few chunks in a piece.
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.PieceAllowedFast(p.index) && peer.Choking) {
- continue
- }
- if skipped > pendingChunksRemaining {
- break
- }
- if !peer.HasExistingRequest(req) {
- skipped++
- continue
- }
- if !peer.PieceAllowedFast(p.index) {
- // We must stay interested for this.
- peer.nextState.Interested = true
- }
- peer.addNextRequest(req)
- return
- }
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() {
- continue
- }
- if !peer.HasPiece(p.index) {
- continue
+ if f := torrentPiece.IterPendingChunks; f != nil {
+ f(func(chunk types.ChunkSpec) {
+ req := Request{pp.Integer(p.index), chunk}
+ pendingChunksRemaining--
+ sortPeersForPiece()
+ spew.Dump(peersForPiece)
+ skipped := 0
+ // Try up to the number of peers that could legitimately receive the request equal to
+ // the number of chunks left. This should ensure that only the best peers serve the last
+ // few chunks in a piece.
+ for _, peer := range peersForPiece {
+ if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
+ continue
+ }
+ if skipped >= pendingChunksRemaining {
+ break
+ }
+ if f := peer.HasExistingRequest; f == nil || !f(req) {
+ skipped++
+ continue
+ }
+ if !peer.pieceAllowedFastOrDefault(p.index) {
+ // We must stay interested for this.
+ peer.nextState.Interested = true
+ }
+ peer.addNextRequest(req)
+ return
}
- if !peer.PieceAllowedFast(p.index) {
- // TODO: Verify that's okay to stay uninterested if we request allowed fast
- // pieces.
- peer.nextState.Interested = true
- if peer.Choking {
+ for _, peer := range peersForPiece {
+ if !peer.canFitRequest() {
+ continue
+ }
+ if !peer.HasPiece(p.index) {
continue
}
+ if !peer.pieceAllowedFastOrDefault(p.index) {
+ // TODO: Verify that's okay to stay uninterested if we request allowed fast
+ // pieces.
+ peer.nextState.Interested = true
+ if peer.Choking {
+ continue
+ }
+ }
+ peer.addNextRequest(req)
+ return
}
- peer.addNextRequest(req)
- return
- }
- })
+ })
+ }
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
}
}
}
- ret := make(map[PeerPointer]PeerNextRequestState)
+ ret := make(map[PeerId]PeerNextRequestState)
for _, peers := range allPeers {
for _, rp := range peers {
if rp.requestablePiecesRemaining != 0 {
package request_strategy
+
+import (
+ "math"
+ "testing"
+
+ pp "github.com/anacrolix/torrent/peer_protocol"
+ qt "github.com/frankban/quicktest"
+)
+
+func r(i pieceIndex, begin int) Request {
+ return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}}
+}
+
+func chunkIter(offsets ...int) func(func(ChunkSpec)) {
+ return func(f func(ChunkSpec)) {
+ for _, offset := range offsets {
+ f(ChunkSpec{pp.Integer(offset), 1})
+ }
+ }
+}
+
+func requestSetFromSlice(rs ...Request) (ret map[Request]struct{}) {
+ ret = make(map[Request]struct{}, len(rs))
+ for _, r := range rs {
+ ret[r] = struct{}{}
+ }
+ return
+}
+
+type intPeerId int
+
+func (i intPeerId) Uintptr() uintptr {
+ return uintptr(i)
+}
+
+func TestStealingFromSlowerPeers(t *testing.T) {
+ c := qt.New(t)
+ order := ClientPieceOrder{}
+ basePeer := Peer{
+ HasPiece: func(i pieceIndex) bool {
+ return true
+ },
+ MaxRequests: math.MaxInt16,
+ DownloadRate: 2,
+ }
+ stealee := basePeer
+ stealee.DownloadRate = 1
+ stealee.HasExistingRequest = func(r Request) bool {
+ return true
+ }
+ stealee.Id = intPeerId(1)
+ firstStealer := basePeer
+ firstStealer.Id = intPeerId(2)
+ secondStealer := basePeer
+ secondStealer.Id = intPeerId(3)
+ c.Assert(order.DoRequests([]*Torrent{{
+ Pieces: []Piece{{
+ Request: true,
+ NumPendingChunks: 2,
+ IterPendingChunks: chunkIter(0, 1),
+ }},
+ Peers: []Peer{
+ stealee,
+ firstStealer,
+ secondStealer,
+ },
+ }}), qt.ContentEquals, map[PeerId]PeerNextRequestState{
+ intPeerId(2): {
+ Interested: true,
+ Requests: requestSetFromSlice(r(0, 0)),
+ },
+ intPeerId(3): {
+ Interested: true,
+ Requests: requestSetFromSlice(r(0, 1)),
+ },
+ stealee.Id: {
+ Interested: false,
+ Requests: requestSetFromSlice(),
+ },
+ })
+}