prio piecePriority
partial bool
availability int64
-}
-
-func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
- for i := range iter.N(numPieces) {
- me.pieces = append(me.pieces, pieceRequestOrderPiece{
- t: t,
- index: i,
- })
- }
+ request bool
}
func (me *clientPieceRequestOrder) Len() int {
return len(me.pieces)
}
-func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
- newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
- for _, p := range me.pieces {
- if p.t != t {
- newPieces = append(newPieces, p)
- }
- }
- me.pieces = newPieces
-}
-
func (me clientPieceRequestOrder) sort() {
sort.Slice(me.pieces, me.less)
}
-func (me *clientPieceRequestOrder) update() {
- for i := range me.pieces {
- p := &me.pieces[i]
- tp := p.t.piece(p.index)
- p.prio = tp.uncachedPriority()
- p.partial = p.t.piecePartiallyDownloaded(p.index)
- p.availability = tp.availability
- }
-}
-
func (me clientPieceRequestOrder) less(_i, _j int) bool {
i := me.pieces[_i]
j := me.pieces[_j]
int(j.prio), int(i.prio),
).Bool(
j.partial, i.partial,
- ).Int64(
- i.availability, j.availability,
- ).Less()
+ ).Int64(i.availability, j.availability).Int(i.index, j.index).Less()
}
func (cl *Client) requester() {
}
}
+type requestsPeer struct {
+ cur *Peer
+ nextRequests map[Request]struct{}
+ nextInterest bool
+ requestablePiecesRemaining int
+}
+
+func (rp *requestsPeer) canRequestPiece(p pieceIndex) bool {
+ return rp.hasPiece(p) && (!rp.choking() || rp.pieceAllowedFast(p))
+}
+
+func (rp *requestsPeer) hasPiece(i pieceIndex) bool {
+ return rp.cur.peerHasPiece(i)
+}
+
+func (rp *requestsPeer) pieceAllowedFast(p pieceIndex) bool {
+ return rp.cur.peerAllowedFast.Contains(p)
+}
+
+func (rp *requestsPeer) choking() bool {
+ return rp.cur.peerChoking
+}
+
+func (rp *requestsPeer) hasExistingRequest(r Request) bool {
+ _, ok := rp.cur.requests[r]
+ return ok
+}
+
+func (rp *requestsPeer) canFitRequest() bool {
+ return len(rp.nextRequests) < rp.cur.nominalMaxRequests()
+}
+
+// Returns true if it is added and wasn't there before.
+func (rp *requestsPeer) addNextRequest(r Request) bool {
+ _, ok := rp.nextRequests[r]
+ if ok {
+ return false
+ }
+ rp.nextRequests[r] = struct{}{}
+ return true
+}
+
+type peersForPieceRequests struct {
+ requestsInPiece int
+ *requestsPeer
+}
+
+func (me *peersForPieceRequests) addNextRequest(r Request) {
+ if me.requestsPeer.addNextRequest(r) {
+ return
+ me.requestsInPiece++
+ }
+}
+
func (cl *Client) doRequests() {
requestOrder := &cl.pieceRequestOrder
requestOrder.pieces = requestOrder.pieces[:0]
- allPeers := make(map[*Torrent][]*Peer)
+ allPeers := make(map[*Torrent][]*requestsPeer)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl.
storageLeft := make(map[*func() *int64]*int64)
for _, t := range cl.torrents {
// TODO: We could do metainfo requests here.
- if t.haveInfo() {
- key := t.storage.Capacity
- if key != nil {
- if _, ok := storageLeft[key]; !ok {
- storageLeft[key] = (*key)()
- }
+ if !t.haveInfo() {
+ continue
+ }
+ key := t.storage.Capacity
+ if key != nil {
+ if _, ok := storageLeft[key]; !ok {
+ storageLeft[key] = (*key)()
}
- requestOrder.addPieces(t, t.numPieces())
}
- var peers []*Peer
+ var peers []*requestsPeer
t.iterPeers(func(p *Peer) {
if !p.closed.IsSet() {
- peers = append(peers, p)
+ peers = append(peers, &requestsPeer{
+ cur: p,
+ nextRequests: make(map[Request]struct{}),
+ })
}
})
- // Sort in *desc* order, approximately the reverse of worseConn where appropriate.
- sort.Slice(peers, func(i, j int) bool {
- return multiless.New().Float64(
- peers[j].downloadRate(), peers[i].downloadRate(),
- ).Uintptr(
- uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less()
- })
+ for i := range iter.N(t.numPieces()) {
+ tp := t.piece(i)
+ pp := tp.purePriority()
+ request := !t.ignorePieceForRequests(i)
+ requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
+ t: t,
+ index: i,
+ prio: pp,
+ partial: t.piecePartiallyDownloaded(i),
+ availability: tp.availability,
+ request: request,
+ })
+ if request {
+ for _, p := range peers {
+ if p.canRequestPiece(i) {
+ p.requestablePiecesRemaining++
+ }
+ }
+ }
+ }
allPeers[t] = peers
}
- requestOrder.update()
requestOrder.sort()
- // For a given piece, the set of allPeers indices that absorbed requests for the piece.
- contributed := make(map[int]struct{})
for _, p := range requestOrder.pieces {
- peers := allPeers[p.t]
torrentPiece := p.t.piece(p.index)
if left := storageLeft[p.t.storage.Capacity]; left != nil {
if *left < int64(torrentPiece.length()) {
}
*left -= int64(torrentPiece.length())
}
- if p.t.ignorePieceForRequests(p.index) {
+ if !p.request {
continue
}
- p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
+ peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
+ for _, peer := range allPeers[p.t] {
+ peersForPiece = append(peersForPiece, &peersForPieceRequests{
+ requestsInPiece: 0,
+ requestsPeer: peer,
+ })
+ }
+ sortPeersForPiece := func() {
+ sort.Slice(peersForPiece, func(i, j int) bool {
+ return multiless.New().Bool(
+ peersForPiece[j].canFitRequest(),
+ peersForPiece[i].canFitRequest(),
+ ).Int(
+ peersForPiece[i].requestsInPiece,
+ peersForPiece[j].requestsInPiece,
+ ).Int(
+ peersForPiece[i].requestablePiecesRemaining,
+ peersForPiece[j].requestablePiecesRemaining,
+ ).Float64(
+ peersForPiece[j].cur.downloadRate(),
+ peersForPiece[i].cur.downloadRate(),
+ ).EagerSameLess(
+ peersForPiece[i].cur.completedHandshake.Equal(peersForPiece[j].cur.completedHandshake),
+ peersForPiece[i].cur.completedHandshake.Before(peersForPiece[j].cur.completedHandshake),
+ // TODO: Probably peer priority can come next
+ ).Uintptr(
+ uintptr(unsafe.Pointer(peersForPiece[j].cur)),
+ uintptr(unsafe.Pointer(peersForPiece[i].cur)),
+ ).Less()
+ })
+ }
+ pendingChunksRemaining := int(p.t.pieceNumPendingChunks(p.index))
+ torrentPiece.iterUndirtiedChunks(func(chunk ChunkSpec) bool {
req := Request{pp.Integer(p.index), chunk}
- const skipAlreadyRequested = false
- if skipAlreadyRequested {
- alreadyRequested := false
- p.t.iterPeers(func(p *Peer) {
- if _, ok := p.requests[req]; ok {
- alreadyRequested = true
- }
- })
- if alreadyRequested {
+ pendingChunksRemaining--
+ sortPeersForPiece()
+ for i, peer := range peersForPiece {
+ if i > pendingChunksRemaining {
+ break
+ }
+ if peer.hasExistingRequest(req) && peer.canFitRequest() {
+ peer.addNextRequest(req)
return true
}
}
- alreadyRequested := false
- for peerIndex, peer := range peers {
- if alreadyRequested {
- // Cancel all requests from "slower" peers after the one that requested it.
- peer.cancel(req)
- } else {
- err := peer.request(req)
- if err == nil {
- contributed[peerIndex] = struct{}{}
- alreadyRequested = true
- //log.Printf("requested %v", req)
+ for _, peer := range peersForPiece {
+ if !peer.canFitRequest() {
+ continue
+ }
+ if !peer.hasPiece(p.index) {
+ continue
+ }
+ if !peer.pieceAllowedFast(p.index) {
+ // TODO: Verify that's okay to stay uninterested if we request allowed fast
+ // pieces.
+ peer.nextInterest = true
+ if peer.choking() {
+ continue
}
}
+ peer.addNextRequest(req)
+ return true
}
return true
})
- // Move requestees for this piece to the back.
- lastIndex := len(peers) - 1
- // Probably should sort the contributees, to make the ordering more deterministic.
- for peerIndex := range contributed {
- peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex]
- delete(contributed, peerIndex)
- lastIndex--
+ for _, peer := range peersForPiece {
+ if peer.canRequestPiece(p.index) {
+ peer.requestablePiecesRemaining--
+ }
}
}
- for _, t := range cl.torrents {
- t.iterPeers(func(p *Peer) {
- if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
- p.setInterested(false)
+ for _, peers := range allPeers {
+ for _, rp := range peers {
+ if rp.requestablePiecesRemaining != 0 {
+ panic(rp.requestablePiecesRemaining)
}
- })
+ applyPeerNextRequests(rp)
+ }
+ }
+}
+
+func applyPeerNextRequests(rp *requestsPeer) {
+ p := rp.cur
+ p.setInterested(rp.nextInterest)
+ for req := range p.requests {
+ if _, ok := rp.nextRequests[req]; !ok {
+ p.cancel(req)
+ }
+ }
+ for req := range rp.nextRequests {
+ err := p.request(req)
+ if err != nil {
+ panic(err)
+ } else {
+ //log.Print(req)
+ }
}
}