]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy/order.go
Add a working request strategy test
[btrtrc.git] / request-strategy / order.go
1 package request_strategy
2
3 import (
4         "sort"
5
6         "github.com/anacrolix/multiless"
7         pp "github.com/anacrolix/torrent/peer_protocol"
8         "github.com/anacrolix/torrent/types"
9         "github.com/davecgh/go-spew/spew"
10 )
11
12 type (
13         Request       = types.Request
14         pieceIndex    = types.PieceIndex
15         piecePriority = types.PiecePriority
16         // This can be made into a type-param later, will be great for testing.
17         ChunkSpec = types.ChunkSpec
18 )
19
20 type ClientPieceOrder struct {
21         pieces []pieceRequestOrderPiece
22 }
23
24 type pieceRequestOrderPiece struct {
25         t     *Torrent
26         index pieceIndex
27         Piece
28 }
29
30 func (me *ClientPieceOrder) Len() int {
31         return len(me.pieces)
32 }
33
34 func (me ClientPieceOrder) sort() {
35         sort.Slice(me.pieces, me.less)
36 }
37
38 func (me ClientPieceOrder) less(_i, _j int) bool {
39         i := me.pieces[_i]
40         j := me.pieces[_j]
41         return multiless.New().Int(
42                 int(j.Priority), int(i.Priority),
43         ).Bool(
44                 j.Partial, i.Partial,
45         ).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less()
46 }
47
48 type requestsPeer struct {
49         Peer
50         nextState                  PeerNextRequestState
51         requestablePiecesRemaining int
52 }
53
54 func (rp *requestsPeer) canFitRequest() bool {
55         return len(rp.nextState.Requests) < rp.MaxRequests
56 }
57
58 // Returns true if it is added and wasn't there before.
59 func (rp *requestsPeer) addNextRequest(r Request) bool {
60         _, ok := rp.nextState.Requests[r]
61         if ok {
62                 return false
63         }
64         rp.nextState.Requests[r] = struct{}{}
65         return true
66 }
67
68 type peersForPieceRequests struct {
69         requestsInPiece int
70         *requestsPeer
71 }
72
73 func (me *peersForPieceRequests) addNextRequest(r Request) {
74         if me.requestsPeer.addNextRequest(r) {
75                 me.requestsInPiece++
76         }
77 }
78
79 type Torrent struct {
80         Pieces   []Piece
81         Capacity *func() *int64
82         Peers    []Peer // not closed.
83 }
84
85 func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
86         requestOrder.pieces = requestOrder.pieces[:0]
87         allPeers := make(map[*Torrent][]*requestsPeer)
88         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
89         // TorrentImpl.
90         storageLeft := make(map[*func() *int64]*int64)
91         for _, t := range torrents {
92                 // TODO: We could do metainfo requests here.
93                 key := t.Capacity
94                 if key != nil {
95                         if _, ok := storageLeft[key]; !ok {
96                                 storageLeft[key] = (*key)()
97                         }
98                 }
99                 var peers []*requestsPeer
100                 for _, p := range t.Peers {
101                         peers = append(peers, &requestsPeer{
102                                 Peer: p,
103                                 nextState: PeerNextRequestState{
104                                         Requests: make(map[Request]struct{}),
105                                 },
106                         })
107                 }
108                 for i, tp := range t.Pieces {
109                         requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
110                                 t:     t,
111                                 index: i,
112                                 Piece: tp,
113                         })
114                         if tp.Request {
115                                 for _, p := range peers {
116                                         if p.canRequestPiece(i) {
117                                                 p.requestablePiecesRemaining++
118                                         }
119                                 }
120                         }
121                 }
122                 allPeers[t] = peers
123         }
124         requestOrder.sort()
125         for _, p := range requestOrder.pieces {
126                 torrentPiece := p
127                 if left := storageLeft[p.t.Capacity]; left != nil {
128                         if *left < int64(torrentPiece.Length) {
129                                 continue
130                         }
131                         *left -= int64(torrentPiece.Length)
132                 }
133                 if !p.Request {
134                         continue
135                 }
136                 peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
137                 for _, peer := range allPeers[p.t] {
138                         peersForPiece = append(peersForPiece, &peersForPieceRequests{
139                                 requestsInPiece: 0,
140                                 requestsPeer:    peer,
141                         })
142                 }
143                 sortPeersForPiece := func() {
144                         sort.Slice(peersForPiece, func(i, j int) bool {
145                                 return multiless.New().Int(
146                                         peersForPiece[i].requestsInPiece,
147                                         peersForPiece[j].requestsInPiece,
148                                 ).Int(
149                                         peersForPiece[i].requestablePiecesRemaining,
150                                         peersForPiece[j].requestablePiecesRemaining,
151                                 ).Float64(
152                                         peersForPiece[j].DownloadRate,
153                                         peersForPiece[i].DownloadRate,
154                                 ).Int64(
155                                         int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
156                                         // TODO: Probably peer priority can come next
157                                 ).Uintptr(
158                                         peersForPiece[i].Id.Uintptr(),
159                                         peersForPiece[j].Id.Uintptr(),
160                                 ).MustLess()
161                         })
162                 }
163                 pendingChunksRemaining := int(p.NumPendingChunks)
164                 if f := torrentPiece.IterPendingChunks; f != nil {
165                         f(func(chunk types.ChunkSpec) {
166                                 req := Request{pp.Integer(p.index), chunk}
167                                 pendingChunksRemaining--
168                                 sortPeersForPiece()
169                                 spew.Dump(peersForPiece)
170                                 skipped := 0
171                                 // Try up to the number of peers that could legitimately receive the request equal to
172                                 // the number of chunks left. This should ensure that only the best peers serve the last
173                                 // few chunks in a piece.
174                                 for _, peer := range peersForPiece {
175                                         if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
176                                                 continue
177                                         }
178                                         if skipped >= pendingChunksRemaining {
179                                                 break
180                                         }
181                                         if f := peer.HasExistingRequest; f == nil || !f(req) {
182                                                 skipped++
183                                                 continue
184                                         }
185                                         if !peer.pieceAllowedFastOrDefault(p.index) {
186                                                 // We must stay interested for this.
187                                                 peer.nextState.Interested = true
188                                         }
189                                         peer.addNextRequest(req)
190                                         return
191                                 }
192                                 for _, peer := range peersForPiece {
193                                         if !peer.canFitRequest() {
194                                                 continue
195                                         }
196                                         if !peer.HasPiece(p.index) {
197                                                 continue
198                                         }
199                                         if !peer.pieceAllowedFastOrDefault(p.index) {
200                                                 // TODO: Verify that's okay to stay uninterested if we request allowed fast
201                                                 // pieces.
202                                                 peer.nextState.Interested = true
203                                                 if peer.Choking {
204                                                         continue
205                                                 }
206                                         }
207                                         peer.addNextRequest(req)
208                                         return
209                                 }
210                         })
211                 }
212                 if pendingChunksRemaining != 0 {
213                         panic(pendingChunksRemaining)
214                 }
215                 for _, peer := range peersForPiece {
216                         if peer.canRequestPiece(p.index) {
217                                 peer.requestablePiecesRemaining--
218                         }
219                 }
220         }
221         ret := make(map[PeerId]PeerNextRequestState)
222         for _, peers := range allPeers {
223                 for _, rp := range peers {
224                         if rp.requestablePiecesRemaining != 0 {
225                                 panic(rp.requestablePiecesRemaining)
226                         }
227                         ret[rp.Id] = rp.nextState
228                 }
229         }
230         return ret
231 }