1 package request_strategy
6 "github.com/anacrolix/multiless"
7 pp "github.com/anacrolix/torrent/peer_protocol"
8 "github.com/anacrolix/torrent/types"
9 "github.com/davecgh/go-spew/spew"
13 Request = types.Request
14 pieceIndex = types.PieceIndex
15 piecePriority = types.PiecePriority
16 // This can be made into a type-param later, will be great for testing.
17 ChunkSpec = types.ChunkSpec
20 type ClientPieceOrder struct {
21 pieces []pieceRequestOrderPiece
24 type pieceRequestOrderPiece struct {
30 func (me *ClientPieceOrder) Len() int {
34 func (me ClientPieceOrder) sort() {
35 sort.Slice(me.pieces, me.less)
38 func (me ClientPieceOrder) less(_i, _j int) bool {
41 return multiless.New().Int(
42 int(j.Priority), int(i.Priority),
45 ).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less()
48 type requestsPeer struct {
50 nextState PeerNextRequestState
51 requestablePiecesRemaining int
54 func (rp *requestsPeer) canFitRequest() bool {
55 return len(rp.nextState.Requests) < rp.MaxRequests
58 // Returns true if it is added and wasn't there before.
59 func (rp *requestsPeer) addNextRequest(r Request) bool {
60 _, ok := rp.nextState.Requests[r]
64 rp.nextState.Requests[r] = struct{}{}
68 type peersForPieceRequests struct {
73 func (me *peersForPieceRequests) addNextRequest(r Request) {
74 if me.requestsPeer.addNextRequest(r) {
81 Capacity *func() *int64
82 Peers []Peer // not closed.
85 func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
86 requestOrder.pieces = requestOrder.pieces[:0]
87 allPeers := make(map[*Torrent][]*requestsPeer)
88 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
90 storageLeft := make(map[*func() *int64]*int64)
91 for _, t := range torrents {
92 // TODO: We could do metainfo requests here.
95 if _, ok := storageLeft[key]; !ok {
96 storageLeft[key] = (*key)()
99 var peers []*requestsPeer
100 for _, p := range t.Peers {
101 peers = append(peers, &requestsPeer{
103 nextState: PeerNextRequestState{
104 Requests: make(map[Request]struct{}),
108 for i, tp := range t.Pieces {
109 requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
115 for _, p := range peers {
116 if p.canRequestPiece(i) {
117 p.requestablePiecesRemaining++
125 for _, p := range requestOrder.pieces {
127 if left := storageLeft[p.t.Capacity]; left != nil {
128 if *left < int64(torrentPiece.Length) {
131 *left -= int64(torrentPiece.Length)
136 peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
137 for _, peer := range allPeers[p.t] {
138 peersForPiece = append(peersForPiece, &peersForPieceRequests{
143 sortPeersForPiece := func() {
144 sort.Slice(peersForPiece, func(i, j int) bool {
145 return multiless.New().Int(
146 peersForPiece[i].requestsInPiece,
147 peersForPiece[j].requestsInPiece,
149 peersForPiece[i].requestablePiecesRemaining,
150 peersForPiece[j].requestablePiecesRemaining,
152 peersForPiece[j].DownloadRate,
153 peersForPiece[i].DownloadRate,
155 int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
156 // TODO: Probably peer priority can come next
158 peersForPiece[i].Id.Uintptr(),
159 peersForPiece[j].Id.Uintptr(),
163 pendingChunksRemaining := int(p.NumPendingChunks)
164 if f := torrentPiece.IterPendingChunks; f != nil {
165 f(func(chunk types.ChunkSpec) {
166 req := Request{pp.Integer(p.index), chunk}
167 pendingChunksRemaining--
169 spew.Dump(peersForPiece)
171 // Try up to the number of peers that could legitimately receive the request equal to
172 // the number of chunks left. This should ensure that only the best peers serve the last
173 // few chunks in a piece.
174 for _, peer := range peersForPiece {
175 if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
178 if skipped >= pendingChunksRemaining {
181 if f := peer.HasExistingRequest; f == nil || !f(req) {
185 if !peer.pieceAllowedFastOrDefault(p.index) {
186 // We must stay interested for this.
187 peer.nextState.Interested = true
189 peer.addNextRequest(req)
192 for _, peer := range peersForPiece {
193 if !peer.canFitRequest() {
196 if !peer.HasPiece(p.index) {
199 if !peer.pieceAllowedFastOrDefault(p.index) {
200 // TODO: Verify that's okay to stay uninterested if we request allowed fast
202 peer.nextState.Interested = true
207 peer.addNextRequest(req)
212 if pendingChunksRemaining != 0 {
213 panic(pendingChunksRemaining)
215 for _, peer := range peersForPiece {
216 if peer.canRequestPiece(p.index) {
217 peer.requestablePiecesRemaining--
221 ret := make(map[PeerId]PeerNextRequestState)
222 for _, peers := range allPeers {
223 for _, rp := range peers {
224 if rp.requestablePiecesRemaining != 0 {
225 panic(rp.requestablePiecesRemaining)
227 ret[rp.Id] = rp.nextState