]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
Move piece allocation to its own func
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "math"
5         "sort"
6
7         "github.com/anacrolix/multiless"
8         pp "github.com/anacrolix/torrent/peer_protocol"
9         "github.com/anacrolix/torrent/types"
10 )
11
12 type (
13         Request       = types.Request
14         pieceIndex    = types.PieceIndex
15         piecePriority = types.PiecePriority
16         // This can be made into a type-param later, will be great for testing.
17         ChunkSpec = types.ChunkSpec
18 )
19
20 type ClientPieceOrder struct {
21         pieces []pieceRequestOrderPiece
22 }
23
24 type pieceRequestOrderPiece struct {
25         t     *Torrent
26         index pieceIndex
27         Piece
28 }
29
30 func (me *ClientPieceOrder) Len() int {
31         return len(me.pieces)
32 }
33
34 func (me ClientPieceOrder) sort() {
35         sort.Slice(me.pieces, me.less)
36 }
37
38 func (me ClientPieceOrder) less(_i, _j int) bool {
39         i := me.pieces[_i]
40         j := me.pieces[_j]
41         return multiless.New().Int(
42                 int(j.Priority), int(i.Priority),
43         ).Bool(
44                 j.Partial, i.Partial,
45         ).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less()
46 }
47
48 type requestsPeer struct {
49         Peer
50         nextState                  PeerNextRequestState
51         requestablePiecesRemaining int
52 }
53
54 func (rp *requestsPeer) canFitRequest() bool {
55         return len(rp.nextState.Requests) < rp.MaxRequests
56 }
57
58 // Returns true if it is added and wasn't there before.
59 func (rp *requestsPeer) addNextRequest(r Request) bool {
60         _, ok := rp.nextState.Requests[r]
61         if ok {
62                 return false
63         }
64         rp.nextState.Requests[r] = struct{}{}
65         return true
66 }
67
68 type peersForPieceRequests struct {
69         requestsInPiece int
70         *requestsPeer
71 }
72
73 func (me *peersForPieceRequests) addNextRequest(r Request) {
74         if me.requestsPeer.addNextRequest(r) {
75                 me.requestsInPiece++
76         }
77 }
78
79 type Torrent struct {
80         Pieces   []Piece
81         Capacity *func() *int64
82         Peers    []Peer // not closed.
83 }
84
85 func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
86         requestOrder.pieces = requestOrder.pieces[:0]
87         allPeers := make(map[*Torrent][]*requestsPeer)
88         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
89         // TorrentImpl.
90         storageLeft := make(map[*func() *int64]*int64)
91         for _, t := range torrents {
92                 // TODO: We could do metainfo requests here.
93                 key := t.Capacity
94                 if key != nil {
95                         if _, ok := storageLeft[key]; !ok {
96                                 storageLeft[key] = (*key)()
97                         }
98                 }
99                 var peers []*requestsPeer
100                 for _, p := range t.Peers {
101                         peers = append(peers, &requestsPeer{
102                                 Peer: p,
103                                 nextState: PeerNextRequestState{
104                                         Requests: make(map[Request]struct{}),
105                                 },
106                         })
107                 }
108                 for i, tp := range t.Pieces {
109                         requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
110                                 t:     t,
111                                 index: i,
112                                 Piece: tp,
113                         })
114                         if tp.Request {
115                                 for _, p := range peers {
116                                         if p.canRequestPiece(i) {
117                                                 p.requestablePiecesRemaining++
118                                         }
119                                 }
120                         }
121                 }
122                 allPeers[t] = peers
123         }
124         requestOrder.sort()
125         for _, p := range requestOrder.pieces {
126                 torrentPiece := p
127                 if left := storageLeft[p.t.Capacity]; left != nil {
128                         if *left < int64(torrentPiece.Length) {
129                                 continue
130                         }
131                         *left -= int64(torrentPiece.Length)
132                 }
133                 if !p.Request {
134                         continue
135                 }
136                 allocatePendingChunks(p, allPeers[p.t])
137         }
138         ret := make(map[PeerId]PeerNextRequestState)
139         for _, peers := range allPeers {
140                 for _, rp := range peers {
141                         if rp.requestablePiecesRemaining != 0 {
142                                 panic(rp.requestablePiecesRemaining)
143                         }
144                         ret[rp.Id] = rp.nextState
145                 }
146         }
147         return ret
148 }
149
150 func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
151         peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
152         for _, peer := range peers {
153                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
154                         requestsInPiece: 0,
155                         requestsPeer:    peer,
156                 })
157         }
158         sortPeersForPiece := func() {
159                 sort.Slice(peersForPiece, func(i, j int) bool {
160                         return multiless.New().Int(
161                                 peersForPiece[i].requestsInPiece,
162                                 peersForPiece[j].requestsInPiece,
163                         ).Int(
164                                 peersForPiece[i].requestablePiecesRemaining,
165                                 peersForPiece[j].requestablePiecesRemaining,
166                         ).Float64(
167                                 peersForPiece[j].DownloadRate,
168                                 peersForPiece[i].DownloadRate,
169                         ).Int64(
170                                 int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
171                                 // TODO: Probably peer priority can come next
172                         ).Uintptr(
173                                 peersForPiece[i].Id.Uintptr(),
174                                 peersForPiece[j].Id.Uintptr(),
175                         ).MustLess()
176                 })
177         }
178         pendingChunksRemaining := int(p.NumPendingChunks)
179         if f := p.IterPendingChunks; f != nil {
180                 f(func(chunk types.ChunkSpec) {
181                         req := Request{pp.Integer(p.index), chunk}
182                         defer func() { pendingChunksRemaining-- }()
183                         sortPeersForPiece()
184                         skipped := 0
185                         // Try up to the number of peers that could legitimately receive the request equal to
186                         // the number of chunks left. This should ensure that only the best peers serve the last
187                         // few chunks in a piece.
188                         lowestNumRequestsInPiece := math.MaxInt16
189                         for _, peer := range peersForPiece {
190                                 if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
191                                         continue
192                                 }
193                                 if skipped+1 >= pendingChunksRemaining {
194                                         break
195                                 }
196                                 if f := peer.HasExistingRequest; f == nil || !f(req) {
197                                         skipped++
198                                         lowestNumRequestsInPiece = peer.requestsInPiece
199                                         continue
200                                 }
201                                 if peer.requestsInPiece > lowestNumRequestsInPiece {
202                                         break
203                                 }
204                                 if !peer.pieceAllowedFastOrDefault(p.index) {
205                                         // We must stay interested for this.
206                                         peer.nextState.Interested = true
207                                 }
208                                 peer.addNextRequest(req)
209                                 return
210                         }
211                         for _, peer := range peersForPiece {
212                                 if !peer.canFitRequest() {
213                                         continue
214                                 }
215                                 if !peer.HasPiece(p.index) {
216                                         continue
217                                 }
218                                 if !peer.pieceAllowedFastOrDefault(p.index) {
219                                         // TODO: Verify that's okay to stay uninterested if we request allowed fast
220                                         // pieces.
221                                         peer.nextState.Interested = true
222                                         if peer.Choking {
223                                                 continue
224                                         }
225                                 }
226                                 peer.addNextRequest(req)
227                                 return
228                         }
229                 })
230         }
231         if pendingChunksRemaining != 0 {
232                 panic(pendingChunksRemaining)
233         }
234         for _, peer := range peersForPiece {
235                 if peer.canRequestPiece(p.index) {
236                         peer.requestablePiecesRemaining--
237                 }
238         }
239 }