]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
9052fb0cc5514bf9dd62bdc20e90649184878415
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "sort"
5
6         "github.com/anacrolix/multiless"
7         pp "github.com/anacrolix/torrent/peer_protocol"
8         "github.com/anacrolix/torrent/types"
9 )
10
11 type (
12         Request       = types.Request
13         pieceIndex    = types.PieceIndex
14         piecePriority = types.PiecePriority
15         // This can be made into a type-param later, will be great for testing.
16         ChunkSpec = types.ChunkSpec
17 )
18
19 type ClientPieceOrder struct{}
20
21 type filterTorrent struct {
22         Torrent
23         unverifiedBytes int64
24         // Potentially shared with other torrents.
25         storageLeft *int64
26 }
27
28 func sortFilterPieces(pieces []filterPiece) {
29         sort.Slice(pieces, func(_i, _j int) bool {
30                 i := pieces[_i]
31                 j := pieces[_j]
32                 return multiless.New().Int(
33                         int(j.Priority), int(i.Priority),
34                 ).Bool(
35                         j.Partial, i.Partial,
36                 ).Int64(
37                         i.Availability, j.Availability,
38                 ).Int(
39                         i.index, j.index,
40                 ).Uintptr(
41                         i.t.StableId, j.t.StableId,
42                 ).MustLess()
43         })
44 }
45
46 type requestsPeer struct {
47         Peer
48         nextState                  PeerNextRequestState
49         requestablePiecesRemaining int
50 }
51
52 func (rp *requestsPeer) canFitRequest() bool {
53         return len(rp.nextState.Requests) < rp.MaxRequests
54 }
55
56 func (rp *requestsPeer) addNextRequest(r Request) {
57         _, ok := rp.nextState.Requests[r]
58         if ok {
59                 panic("should only add once")
60         }
61         rp.nextState.Requests[r] = struct{}{}
62 }
63
64 type peersForPieceRequests struct {
65         requestsInPiece int
66         *requestsPeer
67 }
68
69 func (me *peersForPieceRequests) addNextRequest(r Request) {
70         me.requestsPeer.addNextRequest(r)
71         me.requestsInPiece++
72 }
73
74 type requestablePiece struct {
75         index             pieceIndex
76         t                 Torrent
77         NumPendingChunks  int
78         IterPendingChunks ChunksIter
79 }
80
81 type filterPiece struct {
82         t     *filterTorrent
83         index pieceIndex
84         Piece
85 }
86
87 func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) {
88         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
89         // TorrentImpl.
90         storageLeft := make(map[*func() *int64]*int64)
91         var pieces []filterPiece
92         for _, _t := range torrents {
93                 // TODO: We could do metainfo requests here.
94                 t := &filterTorrent{
95                         Torrent:         _t,
96                         unverifiedBytes: 0,
97                 }
98                 key := t.Capacity
99                 if key != nil {
100                         if _, ok := storageLeft[key]; !ok {
101                                 storageLeft[key] = (*key)()
102                         }
103                         t.storageLeft = storageLeft[key]
104                 }
105                 for i, tp := range t.Pieces {
106                         pieces = append(pieces, filterPiece{
107                                 t:     t,
108                                 index: i,
109                                 Piece: tp,
110                         })
111                 }
112         }
113         sortFilterPieces(pieces)
114         for _, piece := range pieces {
115                 if left := piece.t.storageLeft; left != nil {
116                         if *left < int64(piece.Length) {
117                                 continue
118                         }
119                         *left -= int64(piece.Length)
120                 }
121                 if !piece.Request || piece.NumPendingChunks == 0 {
122                         continue
123                 }
124                 if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
125                         continue
126                 }
127                 piece.t.unverifiedBytes += piece.Length
128                 ret = append(ret, requestablePiece{
129                         index:             piece.index,
130                         t:                 piece.t.Torrent,
131                         NumPendingChunks:  piece.NumPendingChunks,
132                         IterPendingChunks: piece.iterPendingChunksWrapper,
133                 })
134         }
135         return
136 }
137
138 // TODO: We could do metainfo requests here.
139 func (requestOrder *ClientPieceOrder) DoRequests(torrents []Torrent) map[PeerId]PeerNextRequestState {
140         requestPieces := getRequestablePieces(torrents)
141         allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
142         for _, t := range torrents {
143                 peers := make([]*requestsPeer, 0, len(t.Peers))
144                 for _, p := range t.Peers {
145                         peers = append(peers, &requestsPeer{
146                                 Peer: p,
147                                 nextState: PeerNextRequestState{
148                                         Requests: make(map[Request]struct{}),
149                                 },
150                         })
151                 }
152                 allPeers[t.StableId] = peers
153         }
154         for _, piece := range requestPieces {
155                 for _, peer := range allPeers[piece.t.StableId] {
156                         if peer.canRequestPiece(piece.index) {
157                                 peer.requestablePiecesRemaining++
158                         }
159                 }
160         }
161         for _, piece := range requestPieces {
162                 allocatePendingChunks(piece, allPeers[piece.t.StableId])
163         }
164         ret := make(map[PeerId]PeerNextRequestState)
165         for _, peers := range allPeers {
166                 for _, rp := range peers {
167                         if rp.requestablePiecesRemaining != 0 {
168                                 panic(rp.requestablePiecesRemaining)
169                         }
170                         ret[rp.Id] = rp.nextState
171                 }
172         }
173         return ret
174 }
175
176 func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
177         peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
178         for _, peer := range peers {
179                 peersForPiece = append(peersForPiece, &peersForPieceRequests{
180                         requestsInPiece: 0,
181                         requestsPeer:    peer,
182                 })
183         }
184         defer func() {
185                 for _, peer := range peersForPiece {
186                         if peer.canRequestPiece(p.index) {
187                                 peer.requestablePiecesRemaining--
188                         }
189                 }
190         }()
191         sortPeersForPiece := func(byHasRequest *Request) {
192                 sort.Slice(peersForPiece, func(i, j int) bool {
193                         ml := multiless.New().Int(
194                                 peersForPiece[i].requestsInPiece,
195                                 peersForPiece[j].requestsInPiece,
196                         ).Int(
197                                 peersForPiece[i].requestablePiecesRemaining,
198                                 peersForPiece[j].requestablePiecesRemaining,
199                         ).Float64(
200                                 peersForPiece[j].DownloadRate,
201                                 peersForPiece[i].DownloadRate,
202                         )
203                         if byHasRequest != nil {
204                                 _, iHas := peersForPiece[i].nextState.Requests[*byHasRequest]
205                                 _, jHas := peersForPiece[j].nextState.Requests[*byHasRequest]
206                                 ml = ml.Bool(jHas, iHas)
207                         }
208                         return ml.Int64(
209                                 int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
210                                 // TODO: Probably peer priority can come next
211                         ).Uintptr(
212                                 peersForPiece[i].Id.Uintptr(),
213                                 peersForPiece[j].Id.Uintptr(),
214                         ).MustLess()
215                 })
216         }
217         preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
218         p.IterPendingChunks(func(spec ChunkSpec) {
219                 req := Request{pp.Integer(p.index), spec}
220                 for _, peer := range peersForPiece {
221                         if h := peer.HasExistingRequest; h == nil || !h(req) {
222                                 continue
223                         }
224                         if !peer.canFitRequest() {
225                                 continue
226                         }
227                         if !peer.canRequestPiece(p.index) {
228                                 continue
229                         }
230                         preallocated[spec] = peer
231                         peer.addNextRequest(req)
232                 }
233         })
234         pendingChunksRemaining := int(p.NumPendingChunks)
235         p.IterPendingChunks(func(chunk types.ChunkSpec) {
236                 if _, ok := preallocated[chunk]; ok {
237                         return
238                 }
239                 req := Request{pp.Integer(p.index), chunk}
240                 defer func() { pendingChunksRemaining-- }()
241                 sortPeersForPiece(nil)
242                 for _, peer := range peersForPiece {
243                         if !peer.canFitRequest() {
244                                 continue
245                         }
246                         if !peer.HasPiece(p.index) {
247                                 continue
248                         }
249                         if !peer.pieceAllowedFastOrDefault(p.index) {
250                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
251                                 peer.nextState.Interested = true
252                                 if peer.Choking {
253                                         continue
254                                 }
255                         }
256                         peer.addNextRequest(req)
257                         break
258                 }
259         })
260 chunk:
261         for chunk, prePeer := range preallocated {
262                 pendingChunksRemaining--
263                 req := Request{pp.Integer(p.index), chunk}
264                 prePeer.requestsInPiece--
265                 sortPeersForPiece(&req)
266                 delete(prePeer.nextState.Requests, req)
267                 for _, peer := range peersForPiece {
268                         if !peer.canFitRequest() {
269                                 continue
270                         }
271                         if !peer.HasPiece(p.index) {
272                                 continue
273                         }
274                         if !peer.pieceAllowedFastOrDefault(p.index) {
275                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
276                                 peer.nextState.Interested = true
277                                 if peer.Choking {
278                                         continue
279                                 }
280                         }
281                         peer.addNextRequest(req)
282                         continue chunk
283                 }
284         }
285         if pendingChunksRemaining != 0 {
286                 panic(pendingChunksRemaining)
287         }
288 }