]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
Add client-level max unverified bytes
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "sort"
5
6         "github.com/anacrolix/multiless"
7         pp "github.com/anacrolix/torrent/peer_protocol"
8         "github.com/anacrolix/torrent/types"
9 )
10
11 type (
12         Request       = types.Request
13         pieceIndex    = types.PieceIndex
14         piecePriority = types.PiecePriority
15         // This can be made into a type-param later, will be great for testing.
16         ChunkSpec = types.ChunkSpec
17 )
18
19 type ClientPieceOrder struct{}
20
21 type filterTorrent struct {
22         Torrent
23         unverifiedBytes int64
24         // Potentially shared with other torrents.
25         storageLeft *int64
26 }
27
28 func sortFilterPieces(pieces []filterPiece) {
29         sort.Slice(pieces, func(_i, _j int) bool {
30                 i := pieces[_i]
31                 j := pieces[_j]
32                 return multiless.New().Int(
33                         int(j.Priority), int(i.Priority),
34                 ).Bool(
35                         j.Partial, i.Partial,
36                 ).Int64(
37                         i.Availability, j.Availability,
38                 ).Int(
39                         i.index, j.index,
40                 ).Uintptr(
41                         i.t.StableId, j.t.StableId,
42                 ).MustLess()
43         })
44 }
45
46 type requestsPeer struct {
47         Peer
48         nextState                  PeerNextRequestState
49         requestablePiecesRemaining int
50 }
51
52 func (rp *requestsPeer) canFitRequest() bool {
53         return len(rp.nextState.Requests) < rp.MaxRequests
54 }
55
56 func (rp *requestsPeer) addNextRequest(r Request) {
57         _, ok := rp.nextState.Requests[r]
58         if ok {
59                 panic("should only add once")
60         }
61         rp.nextState.Requests[r] = struct{}{}
62 }
63
64 type peersForPieceRequests struct {
65         requestsInPiece int
66         *requestsPeer
67 }
68
69 func (me *peersForPieceRequests) addNextRequest(r Request) {
70         me.requestsPeer.addNextRequest(r)
71         me.requestsInPiece++
72 }
73
74 type requestablePiece struct {
75         index             pieceIndex
76         t                 Torrent
77         NumPendingChunks  int
78         IterPendingChunks ChunksIter
79 }
80
81 type filterPiece struct {
82         t     *filterTorrent
83         index pieceIndex
84         Piece
85 }
86
87 func getRequestablePieces(input Input) (ret []requestablePiece) {
88         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
89         // TorrentImpl.
90         storageLeft := make(map[*func() *int64]*int64)
91         var pieces []filterPiece
92         for _, _t := range input.Torrents {
93                 // TODO: We could do metainfo requests here.
94                 t := &filterTorrent{
95                         Torrent:         _t,
96                         unverifiedBytes: 0,
97                 }
98                 key := t.Capacity
99                 if key != nil {
100                         if _, ok := storageLeft[key]; !ok {
101                                 storageLeft[key] = (*key)()
102                         }
103                         t.storageLeft = storageLeft[key]
104                 }
105                 for i, tp := range t.Pieces {
106                         pieces = append(pieces, filterPiece{
107                                 t:     t,
108                                 index: i,
109                                 Piece: tp,
110                         })
111                 }
112         }
113         sortFilterPieces(pieces)
114         var allTorrentsUnverifiedBytes int64
115         for _, piece := range pieces {
116                 if left := piece.t.storageLeft; left != nil {
117                         if *left < int64(piece.Length) {
118                                 continue
119                         }
120                         *left -= int64(piece.Length)
121                 }
122                 if !piece.Request || piece.NumPendingChunks == 0 {
123                         // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
124                         // considered unverified and hold up further requests.
125                         continue
126                 }
127                 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
128                         continue
129                 }
130                 if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
131                         continue
132                 }
133                 piece.t.unverifiedBytes += piece.Length
134                 allTorrentsUnverifiedBytes += piece.Length
135                 ret = append(ret, requestablePiece{
136                         index:             piece.index,
137                         t:                 piece.t.Torrent,
138                         NumPendingChunks:  piece.NumPendingChunks,
139                         IterPendingChunks: piece.iterPendingChunksWrapper,
140                 })
141         }
142         return
143 }
144
145 type Input struct {
146         Torrents           []Torrent
147         MaxUnverifiedBytes int64
148 }
149
150 // TODO: We could do metainfo requests here.
151 func Run(input Input) map[PeerId]PeerNextRequestState {
152         requestPieces := getRequestablePieces(input)
153         torrents := input.Torrents
154         allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
155         for _, t := range torrents {
156                 peers := make([]*requestsPeer, 0, len(t.Peers))
157                 for _, p := range t.Peers {
158                         peers = append(peers, &requestsPeer{
159                                 Peer: p,
160                                 nextState: PeerNextRequestState{
161                                         Requests: make(map[Request]struct{}),
162                                 },
163                         })
164                 }
165                 allPeers[t.StableId] = peers
166         }
167         for _, piece := range requestPieces {
168                 for _, peer := range allPeers[piece.t.StableId] {
169                         if peer.canRequestPiece(piece.index) {
170                                 peer.requestablePiecesRemaining++
171                         }
172                 }
173         }
174         for _, piece := range requestPieces {
175                 allocatePendingChunks(piece, allPeers[piece.t.StableId])
176         }
177         ret := make(map[PeerId]PeerNextRequestState)
178         for _, peers := range allPeers {
179                 for _, rp := range peers {
180                         if rp.requestablePiecesRemaining != 0 {
181                                 panic(rp.requestablePiecesRemaining)
182                         }
183                         ret[rp.Id] = rp.nextState
184                 }
185         }
186         return ret
187 }
188
189 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
190         peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
191         for _, peer := range peers {
192                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
193                         requestsInPiece: 0,
194                         requestsPeer:    peer,
195                 })
196         }
197         defer func() {
198                 for _, peer := range peersForPiece {
199                         if peer.canRequestPiece(p.index) {
200                                 peer.requestablePiecesRemaining--
201                         }
202                 }
203         }()
204         sortPeersForPiece := func(byHasRequest *Request) {
205                 sort.Slice(peersForPiece, func(i, j int) bool {
206                         ml := multiless.New().Int(
207                                 peersForPiece[i].requestsInPiece,
208                                 peersForPiece[j].requestsInPiece,
209                         ).Int(
210                                 peersForPiece[i].requestablePiecesRemaining,
211                                 peersForPiece[j].requestablePiecesRemaining,
212                         ).Float64(
213                                 peersForPiece[j].DownloadRate,
214                                 peersForPiece[i].DownloadRate,
215                         )
216                         if byHasRequest != nil {
217                                 _, iHas := peersForPiece[i].nextState.Requests[*byHasRequest]
218                                 _, jHas := peersForPiece[j].nextState.Requests[*byHasRequest]
219                                 ml = ml.Bool(jHas, iHas)
220                         }
221                         return ml.Int64(
222                                 int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
223                                 // TODO: Probably peer priority can come next
224                         ).Uintptr(
225                                 peersForPiece[i].Id.Uintptr(),
226                                 peersForPiece[j].Id.Uintptr(),
227                         ).MustLess()
228                 })
229         }
230         preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
231         p.IterPendingChunks(func(spec ChunkSpec) {
232                 req := Request{pp.Integer(p.index), spec}
233                 for _, peer := range peersForPiece {
234                         if h := peer.HasExistingRequest; h == nil || !h(req) {
235                                 continue
236                         }
237                         if !peer.canFitRequest() {
238                                 continue
239                         }
240                         if !peer.canRequestPiece(p.index) {
241                                 continue
242                         }
243                         preallocated[spec] = peer
244                         peer.addNextRequest(req)
245                 }
246         })
247         pendingChunksRemaining := int(p.NumPendingChunks)
248         p.IterPendingChunks(func(chunk types.ChunkSpec) {
249                 if _, ok := preallocated[chunk]; ok {
250                         return
251                 }
252                 req := Request{pp.Integer(p.index), chunk}
253                 defer func() { pendingChunksRemaining-- }()
254                 sortPeersForPiece(nil)
255                 for _, peer := range peersForPiece {
256                         if !peer.canFitRequest() {
257                                 continue
258                         }
259                         if !peer.HasPiece(p.index) {
260                                 continue
261                         }
262                         if !peer.pieceAllowedFastOrDefault(p.index) {
263                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
264                                 peer.nextState.Interested = true
265                                 if peer.Choking {
266                                         continue
267                                 }
268                         }
269                         peer.addNextRequest(req)
270                         break
271                 }
272         })
273 chunk:
274         for chunk, prePeer := range preallocated {
275                 pendingChunksRemaining--
276                 req := Request{pp.Integer(p.index), chunk}
277                 prePeer.requestsInPiece--
278                 sortPeersForPiece(&req)
279                 delete(prePeer.nextState.Requests, req)
280                 for _, peer := range peersForPiece {
281                         if !peer.canFitRequest() {
282                                 continue
283                         }
284                         if !peer.HasPiece(p.index) {
285                                 continue
286                         }
287                         if !peer.pieceAllowedFastOrDefault(p.index) {
288                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
289                                 peer.nextState.Interested = true
290                                 if peer.Choking {
291                                         continue
292                                 }
293                         }
294                         peer.addNextRequest(req)
295                         continue chunk
296                 }
297         }
298         if pendingChunksRemaining != 0 {
299                 panic(pendingChunksRemaining)
300         }
301 }