From: Matt Joiner <anacrolix@gmail.com>
Date: Tue, 26 Jun 2018 04:51:55 +0000 (+1000)
Subject: Do requests synchronously, and don't request from hashing or queued pieces
X-Git-Tag: v1.0.0~117
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=c921242f30364d3d03947455a5a88ac376059444;p=btrtrc.git

Do requests synchronously, and don't request from hashing or queued pieces

Calculating the desired state was a nice idea, but too hard to debug. This way should also be faster.
---

diff --git a/connection.go b/connection.go
index cea8dab6..24da542b 100644
--- a/connection.go
+++ b/connection.go
@@ -303,9 +303,8 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
 		cn.statusFlags(),
 		cn.downloadRate()/(1<<10),
 	)
-	roi := cn.pieceRequestOrderIter()
 	fmt.Fprintf(w, "    next pieces: %v%s\n",
-		iter.ToSlice(iter.Head(10, roi)),
+		iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
 		func() string {
 			if cn.shouldRequestWithoutBias() {
 				return " (fastest)"
@@ -524,33 +523,55 @@ func (cn *connection) request(r request, mw messageWriter) bool {
 }
 
 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
-	numFillBuffers.Add(1)
-	cancel, new, i := cn.desiredRequestState()
-	if !cn.SetInterested(i, msg) {
-		return
-	}
-	if cancel && len(cn.requests) != 0 {
-		fillBufferSentCancels.Add(1)
-		for r := range cn.requests {
-			cn.deleteRequest(r)
-			// log.Printf("%p: cancelling request: %v", cn, r)
-			if !msg(makeCancelMessage(r)) {
-				return
+	if !cn.t.networkingEnabled {
+		if !cn.SetInterested(false, msg) {
+			return
+		}
+		if len(cn.requests) != 0 {
+			for r := range cn.requests {
+				cn.deleteRequest(r)
+				// log.Printf("%p: cancelling request: %v", cn, r)
+				if !msg(makeCancelMessage(r)) {
+					return
+				}
 			}
 		}
 	}
-	if len(new) != 0 {
-		fillBufferSentRequests.Add(1)
-		for _, r := range new {
-			if !cn.request(r, msg) {
-				// If we didn't completely top up the requests, we shouldn't
-				// mark the low water, since we'll want to top up the requests
-				// as soon as we have more write buffer space.
-				return
-			}
+	if len(cn.requests) <= cn.requestsLowWater {
+		filledBuffer := false
+		cn.iterPendingPieces(func(pieceIndex int) bool {
+			cn.iterPendingRequests(pieceIndex, func(r request) bool {
+				if !cn.SetInterested(true, msg) {
+					filledBuffer = true
+					return false
+				}
+				if len(cn.requests) >= cn.nominalMaxRequests() {
+					return false
+				}
+				// Choking is looked at here because our interest is dependent
+				// on whether we'd make requests in its absence.
+				if cn.PeerChoked {
+					if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
+						return false
+					}
+				}
+				if _, ok := cn.requests[r]; ok {
+					return true
+				}
+				filledBuffer = !cn.request(r, msg)
+				return !filledBuffer
+			})
+			return !filledBuffer
+		})
+		if filledBuffer {
+			// If we didn't completely top up the requests, we shouldn't mark
+			// the low water, since we'll want to top up the requests as soon
+			// as we have more write buffer space.
+			return
 		}
 		cn.requestsLowWater = len(cn.requests) / 2
 	}
+
 	cn.upload(msg)
 }
 
@@ -639,53 +660,6 @@ func (cn *connection) PostBitfield() {
 	cn.sentHaves = cn.t.completedPieces.Copy()
 }
 
-// Determines interest and requests to send to a connected peer.
-func nextRequestState(
-	networkingEnabled bool,
-	currentRequests map[request]struct{},
-	peerChoking bool,
-	iterPendingRequests func(f func(request) bool),
-	requestsLowWater int,
-	requestsHighWater int,
-	allowedFast bitmap.Bitmap,
-) (
-	cancelExisting bool, // Cancel all our pending requests
-	newRequests []request, // Chunks to request that we currently aren't
-	interested bool, // Whether we should indicate interest, even if we don't request anything
-) {
-	if !networkingEnabled {
-		return true, nil, false
-	}
-	if len(currentRequests) > requestsLowWater {
-		return false, nil, true
-	}
-	// If we have existing requests, better maintain interest to ensure we get
-	// them. iterPendingRequests might not iterate over outstanding requests.
-	interested = len(currentRequests) != 0
-	iterPendingRequests(func(r request) bool {
-		interested = true
-		if peerChoking {
-			if allowedFast.IsEmpty() {
-				return false
-			}
-			if !allowedFast.Get(int(r.Index)) {
-				return true
-			}
-		}
-		if len(currentRequests)+len(newRequests) >= requestsHighWater {
-			return false
-		}
-		if _, ok := currentRequests[r]; !ok {
-			if newRequests == nil {
-				newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
-			}
-			newRequests = append(newRequests, r)
-		}
-		return true
-	})
-	return
-}
-
 func (cn *connection) updateRequests() {
 	// log.Print("update requests")
 	cn.tickleWriter()
@@ -707,18 +681,26 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
 	}
 }
 
-func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
+func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool {
 	now, readahead := cn.t.readerPiecePriorities()
 	var skip bitmap.Bitmap
 	if !cn.peerSentHaveAll {
-		// Pieces to skip include pieces the peer doesn't have
+		// Pieces to skip include pieces the peer doesn't have.
 		skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
 	}
 	// And pieces that we already have.
 	skip.Union(cn.t.completedPieces)
+	skip.Union(cn.t.piecesQueuedForHash)
 	// Return an iterator over the different priority classes, minus the skip
 	// pieces.
-	return iter.Chain(
+	return iter.All(
+		func(_piece interface{}) bool {
+			i := _piece.(pieceIndex)
+			if cn.t.hashingPiece(i) {
+				return true
+			}
+			return f(i)
+		},
 		iterBitmapsDistinct(&skip, now, readahead),
 		func(cb iter.Callback) {
 			cn.t.pendingPieces.IterTyped(func(piece int) bool {
@@ -756,49 +738,38 @@ func (cn *connection) shouldRequestWithoutBias() bool {
 	return false
 }
 
-func (cn *connection) pieceRequestOrderIter() iter.Func {
+func (cn *connection) iterPendingPieces(f func(int) bool) bool {
 	if cn.t.requestStrategy == 3 {
-		return cn.unbiasedPieceRequestOrder()
+		return cn.iterUnbiasedPieceRequestOrder(f)
 	}
 	if cn.shouldRequestWithoutBias() {
-		return cn.unbiasedPieceRequestOrder()
+		return cn.iterUnbiasedPieceRequestOrder(f)
 	} else {
-		return cn.pieceRequestOrder.Iter
-	}
-}
-
-func (cn *connection) iterPendingRequests(f func(request) bool) {
-	cn.pieceRequestOrderIter()(func(_piece interface{}) bool {
-		piece := _piece.(int)
-		return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
-			r := request{pp.Integer(piece), cs}
-			if cn.t.requestStrategy == 3 {
-				lr := cn.t.lastRequested[r]
-				if !lr.IsZero() {
-					if time.Since(lr) < cn.t.duplicateRequestTimeout {
-						return true
-					} else {
-						torrent.Add("requests duplicated due to timeout", 1)
-					}
+		return cn.pieceRequestOrder.IterTyped(f)
+	}
+}
+
+func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
+	cn.iterPendingPieces(func(i int) bool { return f(i) })
+}
+
+func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool {
+	return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
+		r := request{pp.Integer(piece), cs}
+		if cn.t.requestStrategy == 3 {
+			lr := cn.t.lastRequested[r]
+			if !lr.IsZero() {
+				if time.Since(lr) < cn.t.duplicateRequestTimeout {
+					return true
+				} else {
+					torrent.Add("requests duplicated due to timeout", 1)
 				}
 			}
-			return f(r)
-		})
+		}
+		return f(r)
 	})
 }
 
-func (cn *connection) desiredRequestState() (bool, []request, bool) {
-	return nextRequestState(
-		cn.t.networkingEnabled,
-		cn.requests,
-		cn.PeerChoked,
-		cn.iterPendingRequests,
-		cn.requestsLowWater,
-		cn.nominalMaxRequests(),
-		cn.peerAllowedFast,
-	)
-}
-
 func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
 	chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
 	// TODO: Use "math/rand".Shuffle >= Go 1.10
diff --git a/global.go b/global.go
index 43b84ee1..bbec4b5f 100644
--- a/global.go
+++ b/global.go
@@ -49,8 +49,4 @@ var (
 	pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
 	pieceInclinationsNew    = expvar.NewInt("pieceInclinationsNew")
 	pieceInclinationsPut    = expvar.NewInt("pieceInclinationsPut")
-
-	fillBufferSentCancels  = expvar.NewInt("fillBufferSentCancels")
-	fillBufferSentRequests = expvar.NewInt("fillBufferSentRequests")
-	numFillBuffers         = expvar.NewInt("numFillBuffers")
 )
diff --git a/piece.go b/piece.go
index d29d1dc8..5e0c82a8 100644
--- a/piece.go
+++ b/piece.go
@@ -215,7 +215,7 @@ func (p *Piece) SetPriority(prio piecePriority) {
 }
 
 func (p *Piece) uncachedPriority() (ret piecePriority) {
-	if p.t.pieceComplete(p.index) {
+	if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) {
 		return PiecePriorityNone
 	}
 	for _, f := range p.files {
diff --git a/torrent.go b/torrent.go
index 5736dee5..15913979 100644
--- a/torrent.go
+++ b/torrent.go
@@ -1675,7 +1675,7 @@ func (t *Torrent) onIncompletePiece(piece int) {
 	}
 }
 
-func (t *Torrent) verifyPiece(piece int) {
+func (t *Torrent) verifyPiece(piece pieceIndex) {
 	cl := t.cl
 	cl.mu.Lock()
 	defer cl.mu.Unlock()
@@ -1690,18 +1690,20 @@ func (t *Torrent) verifyPiece(piece int) {
 	if !p.t.piecesQueuedForHash.Remove(piece) {
 		panic("piece was not queued")
 	}
+	t.updatePiecePriority(piece)
 	if t.closed.IsSet() || t.pieceComplete(piece) {
-		t.updatePiecePriority(piece)
 		return
 	}
 	p.hashing = true
 	t.publishPieceChange(piece)
+	t.updatePiecePriority(piece)
 	t.storageLock.RLock()
 	cl.mu.Unlock()
 	sum := t.hashPiece(piece)
 	t.storageLock.RUnlock()
 	cl.mu.Lock()
 	p.hashing = false
+	t.updatePiecePriority(piece)
 	t.pieceHashed(piece, sum == p.hash)
 	t.publishPieceChange(piece)
 }
@@ -1732,6 +1734,7 @@ func (t *Torrent) queuePieceCheck(pieceIndex int) {
 	}
 	t.piecesQueuedForHash.Add(pieceIndex)
 	t.publishPieceChange(pieceIndex)
+	t.updatePiecePriority(pieceIndex)
 	go t.verifyPiece(pieceIndex)
 }