]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy.go
Fix download rate, status output
[btrtrc.git] / request-strategy.go
1 package torrent
2
3 import (
4         "sort"
5         "time"
6         "unsafe"
7
8         "github.com/anacrolix/multiless"
9         pp "github.com/anacrolix/torrent/peer_protocol"
10         "github.com/bradfitz/iter"
11 )
12
13 type clientPieceRequestOrder struct {
14         pieces []pieceRequestOrderPiece
15 }
16
17 type pieceRequestOrderPiece struct {
18         t            *Torrent
19         index        pieceIndex
20         prio         piecePriority
21         partial      bool
22         availability int
23 }
24
25 func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
26         for i := range iter.N(numPieces) {
27                 me.pieces = append(me.pieces, pieceRequestOrderPiece{
28                         t:     t,
29                         index: i,
30                 })
31         }
32 }
33
34 func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
35         newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
36         for _, p := range me.pieces {
37                 if p.t != t {
38                         newPieces = append(newPieces, p)
39                 }
40         }
41         me.pieces = newPieces
42 }
43
44 func (me clientPieceRequestOrder) sort() {
45         sort.SliceStable(me.pieces, me.less)
46 }
47
48 func (me clientPieceRequestOrder) update() {
49         for i := range me.pieces {
50                 p := &me.pieces[i]
51                 p.prio = p.t.piece(p.index).uncachedPriority()
52                 p.partial = p.t.piecePartiallyDownloaded(p.index)
53                 p.availability = p.t.pieceAvailability(p.index)
54         }
55 }
56
57 func (me clientPieceRequestOrder) less(_i, _j int) bool {
58         i := me.pieces[_i]
59         j := me.pieces[_j]
60         return multiless.New().Int(
61                 int(j.prio), int(i.prio),
62         ).Bool(
63                 j.partial, i.partial,
64         ).Int(
65                 i.availability, j.availability,
66         ).Less()
67 }
68
69 func (cl *Client) requester() {
70         for {
71                 func() {
72                         cl.lock()
73                         defer cl.unlock()
74                         cl.doRequests()
75                 }()
76                 select {
77                 case <-cl.closed.LockedChan(cl.locker()):
78                         return
79                 case <-time.After(10 * time.Millisecond):
80                 }
81         }
82 }
83
84 func (cl *Client) doRequests() {
85         requestOrder := clientPieceRequestOrder{}
86         allPeers := make(map[*Torrent][]*Peer)
87         // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
88         // TorrentImpl.
89         storageLeft := make(map[*func() *int64]*int64)
90         for _, t := range cl.torrents {
91                 // TODO: We could do metainfo requests here.
92                 if t.haveInfo() {
93                         if t.storage.Capacity != nil {
94                                 if _, ok := storageLeft[t.storage.Capacity]; !ok {
95                                         storageLeft[t.storage.Capacity] = (*t.storage.Capacity)()
96                                 }
97                         }
98                         requestOrder.addPieces(t, t.numPieces())
99                 }
100                 var peers []*Peer
101                 t.iterPeers(func(p *Peer) {
102                         if !p.closed.IsSet() {
103                                 peers = append(peers, p)
104                         }
105                 })
106                 // Sort in *desc* order, approximately the reverse of worseConn where appropriate.
107                 sort.Slice(peers, func(i, j int) bool {
108                         return multiless.New().Float64(
109                                 peers[j].downloadRate(), peers[i].downloadRate(),
110                         ).Uintptr(
111                                 uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less()
112                 })
113                 allPeers[t] = peers
114         }
115         requestOrder.update()
116         requestOrder.sort()
117         // For a given piece, the set of allPeers indices that absorbed requests for the piece.
118         contributed := make(map[int]struct{})
119         for _, p := range requestOrder.pieces {
120                 if p.t.ignorePieceForRequests(p.index) {
121                         continue
122                 }
123                 peers := allPeers[p.t]
124                 torrentPiece := p.t.piece(p.index)
125                 if left := storageLeft[p.t.storage.Capacity]; left != nil {
126                         if *left < int64(torrentPiece.length()) {
127                                 continue
128                         }
129                         *left -= int64(torrentPiece.length())
130                 }
131                 p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
132                         req := Request{pp.Integer(p.index), chunk}
133                         const skipAlreadyRequested = false
134                         if skipAlreadyRequested {
135                                 alreadyRequested := false
136                                 p.t.iterPeers(func(p *Peer) {
137                                         if _, ok := p.requests[req]; ok {
138                                                 alreadyRequested = true
139                                         }
140                                 })
141                                 if alreadyRequested {
142                                         return true
143                                 }
144                         }
145                         alreadyRequested := false
146                         for peerIndex, peer := range peers {
147                                 if alreadyRequested {
148                                         // Cancel all requests from "slower" peers after the one that requested it.
149                                         peer.cancel(req)
150                                 } else {
151                                         err := peer.request(req)
152                                         if err == nil {
153                                                 contributed[peerIndex] = struct{}{}
154                                                 alreadyRequested = true
155                                                 //log.Printf("requested %v", req)
156                                         }
157                                 }
158                         }
159                         return true
160                 })
161                 // Move requestees for this piece to the back.
162                 lastIndex := len(peers) - 1
163                 for peerIndex := range contributed {
164                         peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex]
165                         delete(contributed, peerIndex)
166                         lastIndex--
167                 }
168         }
169         for _, t := range cl.torrents {
170                 t.iterPeers(func(p *Peer) {
171                         if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
172                                 p.setInterested(false)
173                         }
174                 })
175         }
176 }
177
178 //func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
179 //      chunkIndices := p.dirtyChunks().Copy()
180 //      chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
181 //      return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
182 //              ci, err := chunkIndices.RB.Select(uint32(i))
183 //              if err != nil {
184 //                      panic(err)
185 //              }
186 //              return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
187 //      })
188 //}
189
190 //
191 //func iterUnbiasedPieceRequestOrder(
192 //      cn requestStrategyConnection,
193 //      f func(piece pieceIndex) bool,
194 //      pieceRequestOrder []pieceIndex,
195 //) bool {
196 //      cn.torrent().sortPieceRequestOrder(pieceRequestOrder)
197 //      for _, i := range pieceRequestOrder {
198 //              if !cn.peerHasPiece(i) || cn.torrent().ignorePieceForRequests(i) {
199 //                      continue
200 //              }
201 //              if !f(i) {
202 //                      return false
203 //              }
204 //      }
205 //      return true
206 //}