]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy.go
Rework to improve work stealing and try to thread peers through all request pieces
[btrtrc.git] / request-strategy.go
1 package torrent
2
3 import (
4         "sort"
5         "time"
6         "unsafe"
7
8         "github.com/anacrolix/multiless"
9         pp "github.com/anacrolix/torrent/peer_protocol"
10         "github.com/bradfitz/iter"
11 )
12
13 type clientPieceRequestOrder struct {
14         pieces []pieceRequestOrderPiece
15 }
16
17 type pieceRequestOrderPiece struct {
18         t            *Torrent
19         index        pieceIndex
20         prio         piecePriority
21         partial      bool
22         availability int64
23         request      bool
24 }
25
26 func (me *clientPieceRequestOrder) Len() int {
27         return len(me.pieces)
28 }
29
30 func (me clientPieceRequestOrder) sort() {
31         sort.Slice(me.pieces, me.less)
32 }
33
34 func (me clientPieceRequestOrder) less(_i, _j int) bool {
35         i := me.pieces[_i]
36         j := me.pieces[_j]
37         return multiless.New().Int(
38                 int(j.prio), int(i.prio),
39         ).Bool(
40                 j.partial, i.partial,
41         ).Int64(i.availability, j.availability).Int(i.index, j.index).Less()
42 }
43
44 func (cl *Client) requester() {
45         for {
46                 func() {
47                         cl.lock()
48                         defer cl.unlock()
49                         cl.doRequests()
50                 }()
51                 select {
52                 case <-cl.closed.LockedChan(cl.locker()):
53                         return
54                 case <-time.After(100 * time.Millisecond):
55                 }
56         }
57 }
58
59 type requestsPeer struct {
60         cur                        *Peer
61         nextRequests               map[Request]struct{}
62         nextInterest               bool
63         requestablePiecesRemaining int
64 }
65
66 func (rp *requestsPeer) canRequestPiece(p pieceIndex) bool {
67         return rp.hasPiece(p) && (!rp.choking() || rp.pieceAllowedFast(p))
68 }
69
70 func (rp *requestsPeer) hasPiece(i pieceIndex) bool {
71         return rp.cur.peerHasPiece(i)
72 }
73
74 func (rp *requestsPeer) pieceAllowedFast(p pieceIndex) bool {
75         return rp.cur.peerAllowedFast.Contains(p)
76 }
77
78 func (rp *requestsPeer) choking() bool {
79         return rp.cur.peerChoking
80 }
81
82 func (rp *requestsPeer) hasExistingRequest(r Request) bool {
83         _, ok := rp.cur.requests[r]
84         return ok
85 }
86
87 func (rp *requestsPeer) canFitRequest() bool {
88         return len(rp.nextRequests) < rp.cur.nominalMaxRequests()
89 }
90
91 // Returns true if it is added and wasn't there before.
92 func (rp *requestsPeer) addNextRequest(r Request) bool {
93         _, ok := rp.nextRequests[r]
94         if ok {
95                 return false
96         }
97         rp.nextRequests[r] = struct{}{}
98         return true
99 }
100
101 type peersForPieceRequests struct {
102         requestsInPiece int
103         *requestsPeer
104 }
105
106 func (me *peersForPieceRequests) addNextRequest(r Request) {
107         if me.requestsPeer.addNextRequest(r) {
108                 return
109                 me.requestsInPiece++
110         }
111 }
112
113 func (cl *Client) doRequests() {
114         requestOrder := &cl.pieceRequestOrder
115         requestOrder.pieces = requestOrder.pieces[:0]
116         allPeers := make(map[*Torrent][]*requestsPeer)
117         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
118         // TorrentImpl.
119         storageLeft := make(map[*func() *int64]*int64)
120         for _, t := range cl.torrents {
121                 // TODO: We could do metainfo requests here.
122                 if !t.haveInfo() {
123                         continue
124                 }
125                 key := t.storage.Capacity
126                 if key != nil {
127                         if _, ok := storageLeft[key]; !ok {
128                                 storageLeft[key] = (*key)()
129                         }
130                 }
131                 var peers []*requestsPeer
132                 t.iterPeers(func(p *Peer) {
133                         if !p.closed.IsSet() {
134                                 peers = append(peers, &requestsPeer{
135                                         cur:          p,
136                                         nextRequests: make(map[Request]struct{}),
137                                 })
138                         }
139                 })
140                 for i := range iter.N(t.numPieces()) {
141                         tp := t.piece(i)
142                         pp := tp.purePriority()
143                         request := !t.ignorePieceForRequests(i)
144                         requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
145                                 t:            t,
146                                 index:        i,
147                                 prio:         pp,
148                                 partial:      t.piecePartiallyDownloaded(i),
149                                 availability: tp.availability,
150                                 request:      request,
151                         })
152                         if request {
153                                 for _, p := range peers {
154                                         if p.canRequestPiece(i) {
155                                                 p.requestablePiecesRemaining++
156                                         }
157                                 }
158                         }
159                 }
160                 allPeers[t] = peers
161         }
162         requestOrder.sort()
163         for _, p := range requestOrder.pieces {
164                 torrentPiece := p.t.piece(p.index)
165                 if left := storageLeft[p.t.storage.Capacity]; left != nil {
166                         if *left < int64(torrentPiece.length()) {
167                                 continue
168                         }
169                         *left -= int64(torrentPiece.length())
170                 }
171                 if !p.request {
172                         continue
173                 }
174                 peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
175                 for _, peer := range allPeers[p.t] {
176                         peersForPiece = append(peersForPiece, &peersForPieceRequests{
177                                 requestsInPiece: 0,
178                                 requestsPeer:    peer,
179                         })
180                 }
181                 sortPeersForPiece := func() {
182                         sort.Slice(peersForPiece, func(i, j int) bool {
183                                 return multiless.New().Bool(
184                                         peersForPiece[j].canFitRequest(),
185                                         peersForPiece[i].canFitRequest(),
186                                 ).Int(
187                                         peersForPiece[i].requestsInPiece,
188                                         peersForPiece[j].requestsInPiece,
189                                 ).Int(
190                                         peersForPiece[i].requestablePiecesRemaining,
191                                         peersForPiece[j].requestablePiecesRemaining,
192                                 ).Float64(
193                                         peersForPiece[j].cur.downloadRate(),
194                                         peersForPiece[i].cur.downloadRate(),
195                                 ).EagerSameLess(
196                                         peersForPiece[i].cur.completedHandshake.Equal(peersForPiece[j].cur.completedHandshake),
197                                         peersForPiece[i].cur.completedHandshake.Before(peersForPiece[j].cur.completedHandshake),
198                                         // TODO: Probably peer priority can come next
199                                 ).Uintptr(
200                                         uintptr(unsafe.Pointer(peersForPiece[j].cur)),
201                                         uintptr(unsafe.Pointer(peersForPiece[i].cur)),
202                                 ).Less()
203                         })
204                 }
205                 pendingChunksRemaining := int(p.t.pieceNumPendingChunks(p.index))
206                 torrentPiece.iterUndirtiedChunks(func(chunk ChunkSpec) bool {
207                         req := Request{pp.Integer(p.index), chunk}
208                         pendingChunksRemaining--
209                         sortPeersForPiece()
210                         for i, peer := range peersForPiece {
211                                 if i > pendingChunksRemaining {
212                                         break
213                                 }
214                                 if peer.hasExistingRequest(req) && peer.canFitRequest() {
215                                         peer.addNextRequest(req)
216                                         return true
217                                 }
218                         }
219                         for _, peer := range peersForPiece {
220                                 if !peer.canFitRequest() {
221                                         continue
222                                 }
223                                 if !peer.hasPiece(p.index) {
224                                         continue
225                                 }
226                                 if !peer.pieceAllowedFast(p.index) {
227                                         // TODO: Verify that's okay to stay uninterested if we request allowed fast
228                                         // pieces.
229                                         peer.nextInterest = true
230                                         if peer.choking() {
231                                                 continue
232                                         }
233                                 }
234                                 peer.addNextRequest(req)
235                                 return true
236                         }
237                         return true
238                 })
239                 for _, peer := range peersForPiece {
240                         if peer.canRequestPiece(p.index) {
241                                 peer.requestablePiecesRemaining--
242                         }
243                 }
244         }
245         for _, peers := range allPeers {
246                 for _, rp := range peers {
247                         if rp.requestablePiecesRemaining != 0 {
248                                 panic(rp.requestablePiecesRemaining)
249                         }
250                         applyPeerNextRequests(rp)
251                 }
252         }
253 }
254
255 func applyPeerNextRequests(rp *requestsPeer) {
256         p := rp.cur
257         p.setInterested(rp.nextInterest)
258         for req := range p.requests {
259                 if _, ok := rp.nextRequests[req]; !ok {
260                         p.cancel(req)
261                 }
262         }
263         for req := range rp.nextRequests {
264                 err := p.request(req)
265                 if err != nil {
266                         panic(err)
267                 } else {
268                         //log.Print(req)
269                 }
270         }
271 }