8 "github.com/anacrolix/multiless"
9 pp "github.com/anacrolix/torrent/peer_protocol"
10 "github.com/bradfitz/iter"
13 type clientPieceRequestOrder struct {
14 pieces []pieceRequestOrderPiece
17 type pieceRequestOrderPiece struct {
25 func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
26 for i := range iter.N(numPieces) {
27 me.pieces = append(me.pieces, pieceRequestOrderPiece{
34 func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
35 newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
36 for _, p := range me.pieces {
38 newPieces = append(newPieces, p)
44 func (me clientPieceRequestOrder) sort() {
45 sort.SliceStable(me.pieces, me.less)
48 func (me clientPieceRequestOrder) update() {
49 for i := range me.pieces {
51 p.prio = p.t.piece(p.index).uncachedPriority()
52 p.partial = p.t.piecePartiallyDownloaded(p.index)
53 p.availability = p.t.pieceAvailability(p.index)
57 func (me clientPieceRequestOrder) less(_i, _j int) bool {
60 return multiless.New().Int(
61 int(j.prio), int(i.prio),
65 i.availability, j.availability,
69 func (cl *Client) requester() {
77 case <-cl.closed.LockedChan(cl.locker()):
79 case <-time.After(10 * time.Millisecond):
84 func (cl *Client) doRequests() {
85 requestOrder := clientPieceRequestOrder{}
86 allPeers := make(map[*Torrent][]*Peer)
87 // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
89 storageLeft := make(map[*func() *int64]*int64)
90 for _, t := range cl.torrents {
91 // TODO: We could do metainfo requests here.
93 if t.storage.Capacity != nil {
94 if _, ok := storageLeft[t.storage.Capacity]; !ok {
95 storageLeft[t.storage.Capacity] = (*t.storage.Capacity)()
98 requestOrder.addPieces(t, t.numPieces())
101 t.iterPeers(func(p *Peer) {
102 if !p.closed.IsSet() {
103 peers = append(peers, p)
106 // Sort in *desc* order, approximately the reverse of worseConn where appropriate.
107 sort.Slice(peers, func(i, j int) bool {
108 return multiless.New().Float64(
109 peers[j].downloadRate(), peers[i].downloadRate(),
111 uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less()
115 requestOrder.update()
117 // For a given piece, the set of allPeers indices that absorbed requests for the piece.
118 contributed := make(map[int]struct{})
119 for _, p := range requestOrder.pieces {
120 if p.t.ignorePieceForRequests(p.index) {
123 peers := allPeers[p.t]
124 torrentPiece := p.t.piece(p.index)
125 if left := storageLeft[p.t.storage.Capacity]; left != nil {
126 if *left < int64(torrentPiece.length()) {
129 *left -= int64(torrentPiece.length())
131 p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
132 req := Request{pp.Integer(p.index), chunk}
133 const skipAlreadyRequested = false
134 if skipAlreadyRequested {
135 alreadyRequested := false
136 p.t.iterPeers(func(p *Peer) {
137 if _, ok := p.requests[req]; ok {
138 alreadyRequested = true
141 if alreadyRequested {
145 alreadyRequested := false
146 for peerIndex, peer := range peers {
147 if alreadyRequested {
148 // Cancel all requests from "slower" peers after the one that requested it.
151 err := peer.request(req)
153 contributed[peerIndex] = struct{}{}
154 alreadyRequested = true
155 //log.Printf("requested %v", req)
161 // Move requestees for this piece to the back.
162 lastIndex := len(peers) - 1
163 for peerIndex := range contributed {
164 peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex]
165 delete(contributed, peerIndex)
169 for _, t := range cl.torrents {
170 t.iterPeers(func(p *Peer) {
171 if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
172 p.setInterested(false)
178 //func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
179 // chunkIndices := p.dirtyChunks().Copy()
180 // chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
181 // return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
182 // ci, err := chunkIndices.RB.Select(uint32(i))
186 // return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
191 //func iterUnbiasedPieceRequestOrder(
192 // cn requestStrategyConnection,
193 // f func(piece pieceIndex) bool,
194 // pieceRequestOrder []pieceIndex,
196 // cn.torrent().sortPieceRequestOrder(pieceRequestOrder)
197 // for _, i := range pieceRequestOrder {
198 // if !cn.peerHasPiece(i) || cn.torrent().ignorePieceForRequests(i) {