cn.statusFlags(),
cn.downloadRate()/(1<<10),
)
- roi := cn.pieceRequestOrderIter()
fmt.Fprintf(w, " next pieces: %v%s\n",
- iter.ToSlice(iter.Head(10, roi)),
+ iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
func() string {
if cn.shouldRequestWithoutBias() {
return " (fastest)"
}
func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
- numFillBuffers.Add(1)
- cancel, new, i := cn.desiredRequestState()
- if !cn.SetInterested(i, msg) {
- return
- }
- if cancel && len(cn.requests) != 0 {
- fillBufferSentCancels.Add(1)
- for r := range cn.requests {
- cn.deleteRequest(r)
- // log.Printf("%p: cancelling request: %v", cn, r)
- if !msg(makeCancelMessage(r)) {
- return
+ if !cn.t.networkingEnabled {
+ if !cn.SetInterested(false, msg) {
+ return
+ }
+ if len(cn.requests) != 0 {
+ for r := range cn.requests {
+ cn.deleteRequest(r)
+ // log.Printf("%p: cancelling request: %v", cn, r)
+ if !msg(makeCancelMessage(r)) {
+ return
+ }
}
}
}
- if len(new) != 0 {
- fillBufferSentRequests.Add(1)
- for _, r := range new {
- if !cn.request(r, msg) {
- // If we didn't completely top up the requests, we shouldn't
- // mark the low water, since we'll want to top up the requests
- // as soon as we have more write buffer space.
- return
- }
+ if len(cn.requests) <= cn.requestsLowWater {
+ filledBuffer := false
+ cn.iterPendingPieces(func(pieceIndex int) bool {
+ cn.iterPendingRequests(pieceIndex, func(r request) bool {
+ if !cn.SetInterested(true, msg) {
+ filledBuffer = true
+ return false
+ }
+ if len(cn.requests) >= cn.nominalMaxRequests() {
+ return false
+ }
+ // Choking is looked at here because our interest is dependent
+ // on whether we'd make requests in its absence.
+ if cn.PeerChoked {
+ if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
+ return false
+ }
+ }
+ if _, ok := cn.requests[r]; ok {
+ return true
+ }
+ filledBuffer = !cn.request(r, msg)
+ return !filledBuffer
+ })
+ return !filledBuffer
+ })
+ if filledBuffer {
+ // If we didn't completely top up the requests, we shouldn't mark
+ // the low water, since we'll want to top up the requests as soon
+ // as we have more write buffer space.
+ return
}
cn.requestsLowWater = len(cn.requests) / 2
}
+
cn.upload(msg)
}
cn.sentHaves = cn.t.completedPieces.Copy()
}
-// Determines interest and requests to send to a connected peer.
-func nextRequestState(
- networkingEnabled bool,
- currentRequests map[request]struct{},
- peerChoking bool,
- iterPendingRequests func(f func(request) bool),
- requestsLowWater int,
- requestsHighWater int,
- allowedFast bitmap.Bitmap,
-) (
- cancelExisting bool, // Cancel all our pending requests
- newRequests []request, // Chunks to request that we currently aren't
- interested bool, // Whether we should indicate interest, even if we don't request anything
-) {
- if !networkingEnabled {
- return true, nil, false
- }
- if len(currentRequests) > requestsLowWater {
- return false, nil, true
- }
- // If we have existing requests, better maintain interest to ensure we get
- // them. iterPendingRequests might not iterate over outstanding requests.
- interested = len(currentRequests) != 0
- iterPendingRequests(func(r request) bool {
- interested = true
- if peerChoking {
- if allowedFast.IsEmpty() {
- return false
- }
- if !allowedFast.Get(int(r.Index)) {
- return true
- }
- }
- if len(currentRequests)+len(newRequests) >= requestsHighWater {
- return false
- }
- if _, ok := currentRequests[r]; !ok {
- if newRequests == nil {
- newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
- }
- newRequests = append(newRequests, r)
- }
- return true
- })
- return
-}
-
func (cn *connection) updateRequests() {
// log.Print("update requests")
cn.tickleWriter()
}
}
-func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
+func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool {
now, readahead := cn.t.readerPiecePriorities()
var skip bitmap.Bitmap
if !cn.peerSentHaveAll {
- // Pieces to skip include pieces the peer doesn't have
+ // Pieces to skip include pieces the peer doesn't have.
skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
}
// And pieces that we already have.
skip.Union(cn.t.completedPieces)
+ skip.Union(cn.t.piecesQueuedForHash)
// Return an iterator over the different priority classes, minus the skip
// pieces.
- return iter.Chain(
+ return iter.All(
+ func(_piece interface{}) bool {
+ i := _piece.(pieceIndex)
+ if cn.t.hashingPiece(i) {
+ return true
+ }
+ return f(i)
+ },
iterBitmapsDistinct(&skip, now, readahead),
func(cb iter.Callback) {
cn.t.pendingPieces.IterTyped(func(piece int) bool {
return false
}
-func (cn *connection) pieceRequestOrderIter() iter.Func {
+func (cn *connection) iterPendingPieces(f func(int) bool) bool {
if cn.t.requestStrategy == 3 {
- return cn.unbiasedPieceRequestOrder()
+ return cn.iterUnbiasedPieceRequestOrder(f)
}
if cn.shouldRequestWithoutBias() {
- return cn.unbiasedPieceRequestOrder()
+ return cn.iterUnbiasedPieceRequestOrder(f)
} else {
- return cn.pieceRequestOrder.Iter
- }
-}
-
-func (cn *connection) iterPendingRequests(f func(request) bool) {
- cn.pieceRequestOrderIter()(func(_piece interface{}) bool {
- piece := _piece.(int)
- return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
- r := request{pp.Integer(piece), cs}
- if cn.t.requestStrategy == 3 {
- lr := cn.t.lastRequested[r]
- if !lr.IsZero() {
- if time.Since(lr) < cn.t.duplicateRequestTimeout {
- return true
- } else {
- torrent.Add("requests duplicated due to timeout", 1)
- }
+ return cn.pieceRequestOrder.IterTyped(f)
+ }
+}
+
+func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
+ cn.iterPendingPieces(func(i int) bool { return f(i) })
+}
+
+func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool {
+ return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
+ r := request{pp.Integer(piece), cs}
+ if cn.t.requestStrategy == 3 {
+ lr := cn.t.lastRequested[r]
+ if !lr.IsZero() {
+ if time.Since(lr) < cn.t.duplicateRequestTimeout {
+ return true
+ } else {
+ torrent.Add("requests duplicated due to timeout", 1)
}
}
- return f(r)
- })
+ }
+ return f(r)
})
}
-func (cn *connection) desiredRequestState() (bool, []request, bool) {
- return nextRequestState(
- cn.t.networkingEnabled,
- cn.requests,
- cn.PeerChoked,
- cn.iterPendingRequests,
- cn.requestsLowWater,
- cn.nominalMaxRequests(),
- cn.peerAllowedFast,
- )
-}
-
func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
// TODO: Use "math/rand".Shuffle >= Go 1.10