]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy.go
4682720a647dc66fd062cb45505be727922e8d47
[btrtrc.git] / request-strategy.go
1 package torrent
2
3 import (
4         "sort"
5         "time"
6         "unsafe"
7
8         "github.com/anacrolix/multiless"
9         pp "github.com/anacrolix/torrent/peer_protocol"
10         "github.com/bradfitz/iter"
11 )
12
13 type clientPieceRequestOrder struct {
14         pieces []pieceRequestOrderPiece
15 }
16
17 type pieceRequestOrderPiece struct {
18         t            *Torrent
19         index        pieceIndex
20         prio         piecePriority
21         partial      bool
22         availability int64
23         request      bool
24 }
25
26 func (me *clientPieceRequestOrder) Len() int {
27         return len(me.pieces)
28 }
29
30 func (me clientPieceRequestOrder) sort() {
31         sort.Slice(me.pieces, me.less)
32 }
33
34 func (me clientPieceRequestOrder) less(_i, _j int) bool {
35         i := me.pieces[_i]
36         j := me.pieces[_j]
37         return multiless.New().Int(
38                 int(j.prio), int(i.prio),
39         ).Bool(
40                 j.partial, i.partial,
41         ).Int64(i.availability, j.availability).Int(i.index, j.index).Less()
42 }
43
44 func (cl *Client) requester() {
45         for {
46                 func() {
47                         cl.lock()
48                         defer cl.unlock()
49                         cl.doRequests()
50                 }()
51                 select {
52                 case <-cl.closed.LockedChan(cl.locker()):
53                         return
54                 case <-time.After(100 * time.Millisecond):
55                 }
56         }
57 }
58
59 type requestsPeer struct {
60         cur                        *Peer
61         nextRequests               map[Request]struct{}
62         nextInterest               bool
63         requestablePiecesRemaining int
64 }
65
66 func (rp *requestsPeer) canRequestPiece(p pieceIndex) bool {
67         return rp.hasPiece(p) && (!rp.choking() || rp.pieceAllowedFast(p))
68 }
69
70 func (rp *requestsPeer) hasPiece(i pieceIndex) bool {
71         return rp.cur.peerHasPiece(i)
72 }
73
74 func (rp *requestsPeer) pieceAllowedFast(p pieceIndex) bool {
75         return rp.cur.peerAllowedFast.Contains(p)
76 }
77
78 func (rp *requestsPeer) choking() bool {
79         return rp.cur.peerChoking
80 }
81
82 func (rp *requestsPeer) hasExistingRequest(r Request) bool {
83         _, ok := rp.cur.requests[r]
84         return ok
85 }
86
87 func (rp *requestsPeer) canFitRequest() bool {
88         return len(rp.nextRequests) < rp.cur.nominalMaxRequests()
89 }
90
91 // Returns true if it is added and wasn't there before.
92 func (rp *requestsPeer) addNextRequest(r Request) bool {
93         _, ok := rp.nextRequests[r]
94         if ok {
95                 return false
96         }
97         rp.nextRequests[r] = struct{}{}
98         return true
99 }
100
101 type peersForPieceRequests struct {
102         requestsInPiece int
103         *requestsPeer
104 }
105
106 func (me *peersForPieceRequests) addNextRequest(r Request) {
107         if me.requestsPeer.addNextRequest(r) {
108                 return
109                 me.requestsInPiece++
110         }
111 }
112
113 func (cl *Client) doRequests() {
114         requestOrder := &cl.pieceRequestOrder
115         requestOrder.pieces = requestOrder.pieces[:0]
116         allPeers := make(map[*Torrent][]*requestsPeer)
117         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
118         // TorrentImpl.
119         storageLeft := make(map[*func() *int64]*int64)
120         for _, t := range cl.torrents {
121                 // TODO: We could do metainfo requests here.
122                 if !t.haveInfo() {
123                         continue
124                 }
125                 key := t.storage.Capacity
126                 if key != nil {
127                         if _, ok := storageLeft[key]; !ok {
128                                 storageLeft[key] = (*key)()
129                         }
130                 }
131                 var peers []*requestsPeer
132                 t.iterPeers(func(p *Peer) {
133                         if !p.closed.IsSet() {
134                                 peers = append(peers, &requestsPeer{
135                                         cur:          p,
136                                         nextRequests: make(map[Request]struct{}),
137                                 })
138                         }
139                 })
140                 for i := range iter.N(t.numPieces()) {
141                         tp := t.piece(i)
142                         pp := tp.purePriority()
143                         request := !t.ignorePieceForRequests(i)
144                         requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
145                                 t:            t,
146                                 index:        i,
147                                 prio:         pp,
148                                 partial:      t.piecePartiallyDownloaded(i),
149                                 availability: tp.availability,
150                                 request:      request,
151                         })
152                         if request {
153                                 for _, p := range peers {
154                                         if p.canRequestPiece(i) {
155                                                 p.requestablePiecesRemaining++
156                                         }
157                                 }
158                         }
159                 }
160                 allPeers[t] = peers
161         }
162         requestOrder.sort()
163         for _, p := range requestOrder.pieces {
164                 torrentPiece := p.t.piece(p.index)
165                 if left := storageLeft[p.t.storage.Capacity]; left != nil {
166                         if *left < int64(torrentPiece.length()) {
167                                 continue
168                         }
169                         *left -= int64(torrentPiece.length())
170                 }
171                 if !p.request {
172                         continue
173                 }
174                 peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
175                 for _, peer := range allPeers[p.t] {
176                         peersForPiece = append(peersForPiece, &peersForPieceRequests{
177                                 requestsInPiece: 0,
178                                 requestsPeer:    peer,
179                         })
180                 }
181                 sortPeersForPiece := func() {
182                         sort.Slice(peersForPiece, func(i, j int) bool {
183                                 return multiless.New().Int(
184                                         peersForPiece[i].requestsInPiece,
185                                         peersForPiece[j].requestsInPiece,
186                                 ).Int(
187                                         peersForPiece[i].requestablePiecesRemaining,
188                                         peersForPiece[j].requestablePiecesRemaining,
189                                 ).Float64(
190                                         peersForPiece[j].cur.downloadRate(),
191                                         peersForPiece[i].cur.downloadRate(),
192                                 ).EagerSameLess(
193                                         peersForPiece[i].cur.completedHandshake.Equal(peersForPiece[j].cur.completedHandshake),
194                                         peersForPiece[i].cur.completedHandshake.Before(peersForPiece[j].cur.completedHandshake),
195                                         // TODO: Probably peer priority can come next
196                                 ).Uintptr(
197                                         uintptr(unsafe.Pointer(peersForPiece[j].cur)),
198                                         uintptr(unsafe.Pointer(peersForPiece[i].cur)),
199                                 ).Less()
200                         })
201                 }
202                 pendingChunksRemaining := int(p.t.pieceNumPendingChunks(p.index))
203                 torrentPiece.iterUndirtiedChunks(func(chunk ChunkSpec) bool {
204                         req := Request{pp.Integer(p.index), chunk}
205                         pendingChunksRemaining--
206                         sortPeersForPiece()
207                         skipped := 0
208                         // Try up to the number of peers that could legitimately receive the request equal to
209                         // the number of chunks left. This should ensure that only the best peers serve the last
210                         // few chunks in a piece.
211                         for _, peer := range peersForPiece {
212                                 if !peer.canFitRequest() || !peer.hasPiece(p.index) || (!peer.pieceAllowedFast(p.index) && peer.choking()) {
213                                         continue
214                                 }
215                                 if skipped > pendingChunksRemaining {
216                                         break
217                                 }
218                                 if !peer.hasExistingRequest(req) {
219                                         skipped++
220                                         continue
221                                 }
222                                 if !peer.pieceAllowedFast(p.index) {
223                                         // We must stay interested for this.
224                                         peer.nextInterest = true
225                                 }
226                                 peer.addNextRequest(req)
227                                 return true
228                         }
229                         for _, peer := range peersForPiece {
230                                 if !peer.canFitRequest() {
231                                         continue
232                                 }
233                                 if !peer.hasPiece(p.index) {
234                                         continue
235                                 }
236                                 if !peer.pieceAllowedFast(p.index) {
237                                         // TODO: Verify that's okay to stay uninterested if we request allowed fast
238                                         // pieces.
239                                         peer.nextInterest = true
240                                         if peer.choking() {
241                                                 continue
242                                         }
243                                 }
244                                 peer.addNextRequest(req)
245                                 return true
246                         }
247                         return true
248                 })
249                 if pendingChunksRemaining != 0 {
250                         panic(pendingChunksRemaining)
251                 }
252                 for _, peer := range peersForPiece {
253                         if peer.canRequestPiece(p.index) {
254                                 peer.requestablePiecesRemaining--
255                         }
256                 }
257         }
258         for _, peers := range allPeers {
259                 for _, rp := range peers {
260                         if rp.requestablePiecesRemaining != 0 {
261                                 panic(rp.requestablePiecesRemaining)
262                         }
263                         applyPeerNextRequests(rp)
264                 }
265         }
266 }
267
268 func applyPeerNextRequests(rp *requestsPeer) {
269         p := rp.cur
270         p.setInterested(rp.nextInterest)
271         for req := range p.requests {
272                 if _, ok := rp.nextRequests[req]; !ok {
273                         p.cancel(req)
274                 }
275         }
276         for req := range rp.nextRequests {
277                 err := p.request(req)
278                 if err != nil {
279                         panic(err)
280                 } else {
281                         //log.Print(req)
282                 }
283         }
284 }