8 "github.com/anacrolix/multiless"
9 pp "github.com/anacrolix/torrent/peer_protocol"
10 "github.com/bradfitz/iter"
13 type clientPieceRequestOrder struct {
14 pieces []pieceRequestOrderPiece
17 type pieceRequestOrderPiece struct {
25 func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
26 for i := range iter.N(numPieces) {
27 me.pieces = append(me.pieces, pieceRequestOrderPiece{
34 func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
35 newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
36 for _, p := range me.pieces {
38 newPieces = append(newPieces, p)
44 func (me clientPieceRequestOrder) sort() {
45 sort.SliceStable(me.pieces, me.less)
48 func (me *clientPieceRequestOrder) update() {
49 for i := range me.pieces {
51 tp := p.t.piece(p.index)
52 p.prio = tp.uncachedPriority()
53 p.partial = p.t.piecePartiallyDownloaded(p.index)
54 p.availability = tp.availability
58 func (me clientPieceRequestOrder) less(_i, _j int) bool {
61 return multiless.New().Int(
62 int(j.prio), int(i.prio),
66 i.availability, j.availability,
70 func (cl *Client) requester() {
78 case <-cl.closed.LockedChan(cl.locker()):
80 case <-time.After(10 * time.Millisecond):
85 func (cl *Client) doRequests() {
86 requestOrder := clientPieceRequestOrder{}
87 allPeers := make(map[*Torrent][]*Peer)
88 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
90 storageLeft := make(map[*func() *int64]*int64)
91 for _, t := range cl.torrents {
92 // TODO: We could do metainfo requests here.
94 if t.storage.Capacity != nil {
95 if _, ok := storageLeft[t.storage.Capacity]; !ok {
96 storageLeft[t.storage.Capacity] = (*t.storage.Capacity)()
99 requestOrder.addPieces(t, t.numPieces())
102 t.iterPeers(func(p *Peer) {
103 if !p.closed.IsSet() {
104 peers = append(peers, p)
107 // Sort in *desc* order, approximately the reverse of worseConn where appropriate.
108 sort.Slice(peers, func(i, j int) bool {
109 return multiless.New().Float64(
110 peers[j].downloadRate(), peers[i].downloadRate(),
112 uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less()
116 requestOrder.update()
118 // For a given piece, the set of allPeers indices that absorbed requests for the piece.
119 contributed := make(map[int]struct{})
120 for _, p := range requestOrder.pieces {
121 if p.t.ignorePieceForRequests(p.index) {
124 peers := allPeers[p.t]
125 torrentPiece := p.t.piece(p.index)
126 if left := storageLeft[p.t.storage.Capacity]; left != nil {
127 if *left < int64(torrentPiece.length()) {
130 *left -= int64(torrentPiece.length())
132 p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
133 req := Request{pp.Integer(p.index), chunk}
134 const skipAlreadyRequested = false
135 if skipAlreadyRequested {
136 alreadyRequested := false
137 p.t.iterPeers(func(p *Peer) {
138 if _, ok := p.requests[req]; ok {
139 alreadyRequested = true
142 if alreadyRequested {
146 alreadyRequested := false
147 for peerIndex, peer := range peers {
148 if alreadyRequested {
149 // Cancel all requests from "slower" peers after the one that requested it.
152 err := peer.request(req)
154 contributed[peerIndex] = struct{}{}
155 alreadyRequested = true
156 //log.Printf("requested %v", req)
162 // Move requestees for this piece to the back.
163 lastIndex := len(peers) - 1
164 for peerIndex := range contributed {
165 peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex]
166 delete(contributed, peerIndex)
170 for _, t := range cl.torrents {
171 t.iterPeers(func(p *Peer) {
172 if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
173 p.setInterested(false)
179 //func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
180 // chunkIndices := p.dirtyChunks().Copy()
181 // chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
182 // return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
183 // ci, err := chunkIndices.RB.Select(uint32(i))
187 // return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
192 //func iterUnbiasedPieceRequestOrder(
193 // cn requestStrategyConnection,
194 // f func(piece pieceIndex) bool,
195 // pieceRequestOrder []pieceIndex,
197 // cn.torrent().sortPieceRequestOrder(pieceRequestOrder)
198 // for _, i := range pieceRequestOrder {
199 // if !cn.peerHasPiece(i) || cn.torrent().ignorePieceForRequests(i) {