]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy.go
Track peer availability at the Torrent-level
[btrtrc.git] / request-strategy.go
1 package torrent
2
3 import (
4         "sort"
5         "time"
6         "unsafe"
7
8         "github.com/anacrolix/multiless"
9         pp "github.com/anacrolix/torrent/peer_protocol"
10         "github.com/bradfitz/iter"
11 )
12
13 type clientPieceRequestOrder struct {
14         pieces []pieceRequestOrderPiece
15 }
16
17 type pieceRequestOrderPiece struct {
18         t            *Torrent
19         index        pieceIndex
20         prio         piecePriority
21         partial      bool
22         availability int64
23 }
24
25 func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
26         for i := range iter.N(numPieces) {
27                 me.pieces = append(me.pieces, pieceRequestOrderPiece{
28                         t:     t,
29                         index: i,
30                 })
31         }
32 }
33
34 func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
35         newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
36         for _, p := range me.pieces {
37                 if p.t != t {
38                         newPieces = append(newPieces, p)
39                 }
40         }
41         me.pieces = newPieces
42 }
43
44 func (me clientPieceRequestOrder) sort() {
45         sort.SliceStable(me.pieces, me.less)
46 }
47
48 func (me *clientPieceRequestOrder) update() {
49         for i := range me.pieces {
50                 p := &me.pieces[i]
51                 tp := p.t.piece(p.index)
52                 p.prio = tp.uncachedPriority()
53                 p.partial = p.t.piecePartiallyDownloaded(p.index)
54                 p.availability = tp.availability
55         }
56 }
57
58 func (me clientPieceRequestOrder) less(_i, _j int) bool {
59         i := me.pieces[_i]
60         j := me.pieces[_j]
61         return multiless.New().Int(
62                 int(j.prio), int(i.prio),
63         ).Bool(
64                 j.partial, i.partial,
65         ).Int64(
66                 i.availability, j.availability,
67         ).Less()
68 }
69
70 func (cl *Client) requester() {
71         for {
72                 func() {
73                         cl.lock()
74                         defer cl.unlock()
75                         cl.doRequests()
76                 }()
77                 select {
78                 case <-cl.closed.LockedChan(cl.locker()):
79                         return
80                 case <-time.After(10 * time.Millisecond):
81                 }
82         }
83 }
84
85 func (cl *Client) doRequests() {
86         requestOrder := clientPieceRequestOrder{}
87         allPeers := make(map[*Torrent][]*Peer)
88         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
89         // TorrentImpl.
90         storageLeft := make(map[*func() *int64]*int64)
91         for _, t := range cl.torrents {
92                 // TODO: We could do metainfo requests here.
93                 if t.haveInfo() {
94                         if t.storage.Capacity != nil {
95                                 if _, ok := storageLeft[t.storage.Capacity]; !ok {
96                                         storageLeft[t.storage.Capacity] = (*t.storage.Capacity)()
97                                 }
98                         }
99                         requestOrder.addPieces(t, t.numPieces())
100                 }
101                 var peers []*Peer
102                 t.iterPeers(func(p *Peer) {
103                         if !p.closed.IsSet() {
104                                 peers = append(peers, p)
105                         }
106                 })
107                 // Sort in *desc* order, approximately the reverse of worseConn where appropriate.
108                 sort.Slice(peers, func(i, j int) bool {
109                         return multiless.New().Float64(
110                                 peers[j].downloadRate(), peers[i].downloadRate(),
111                         ).Uintptr(
112                                 uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less()
113                 })
114                 allPeers[t] = peers
115         }
116         requestOrder.update()
117         requestOrder.sort()
118         // For a given piece, the set of allPeers indices that absorbed requests for the piece.
119         contributed := make(map[int]struct{})
120         for _, p := range requestOrder.pieces {
121                 if p.t.ignorePieceForRequests(p.index) {
122                         continue
123                 }
124                 peers := allPeers[p.t]
125                 torrentPiece := p.t.piece(p.index)
126                 if left := storageLeft[p.t.storage.Capacity]; left != nil {
127                         if *left < int64(torrentPiece.length()) {
128                                 continue
129                         }
130                         *left -= int64(torrentPiece.length())
131                 }
132                 p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
133                         req := Request{pp.Integer(p.index), chunk}
134                         const skipAlreadyRequested = false
135                         if skipAlreadyRequested {
136                                 alreadyRequested := false
137                                 p.t.iterPeers(func(p *Peer) {
138                                         if _, ok := p.requests[req]; ok {
139                                                 alreadyRequested = true
140                                         }
141                                 })
142                                 if alreadyRequested {
143                                         return true
144                                 }
145                         }
146                         alreadyRequested := false
147                         for peerIndex, peer := range peers {
148                                 if alreadyRequested {
149                                         // Cancel all requests from "slower" peers after the one that requested it.
150                                         peer.cancel(req)
151                                 } else {
152                                         err := peer.request(req)
153                                         if err == nil {
154                                                 contributed[peerIndex] = struct{}{}
155                                                 alreadyRequested = true
156                                                 //log.Printf("requested %v", req)
157                                         }
158                                 }
159                         }
160                         return true
161                 })
162                 // Move requestees for this piece to the back.
163                 lastIndex := len(peers) - 1
164                 for peerIndex := range contributed {
165                         peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex]
166                         delete(contributed, peerIndex)
167                         lastIndex--
168                 }
169         }
170         for _, t := range cl.torrents {
171                 t.iterPeers(func(p *Peer) {
172                         if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
173                                 p.setInterested(false)
174                         }
175                 })
176         }
177 }
178
179 //func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
180 //      chunkIndices := p.dirtyChunks().Copy()
181 //      chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
182 //      return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
183 //              ci, err := chunkIndices.RB.Select(uint32(i))
184 //              if err != nil {
185 //                      panic(err)
186 //              }
187 //              return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
188 //      })
189 //}
190
191 //
192 //func iterUnbiasedPieceRequestOrder(
193 //      cn requestStrategyConnection,
194 //      f func(piece pieceIndex) bool,
195 //      pieceRequestOrder []pieceIndex,
196 //) bool {
197 //      cn.torrent().sortPieceRequestOrder(pieceRequestOrder)
198 //      for _, i := range pieceRequestOrder {
199 //              if !cn.peerHasPiece(i) || cn.torrent().ignorePieceForRequests(i) {
200 //                      continue
201 //              }
202 //              if !f(i) {
203 //                      return false
204 //              }
205 //      }
206 //      return true
207 //}