From: Matt Joiner <anacrolix@gmail.com>
Date: Sun, 12 Dec 2021 05:56:00 +0000 (+1100)
Subject: Shuffle duplicate requests
X-Git-Tag: v1.39.0~15
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=4cfdc2f497d3a616292f321c688b46ee494e151b;p=btrtrc.git

Shuffle duplicate requests

Add missing import
---

diff --git a/requesting.go b/requesting.go
index 7ffa19cc..a330df53 100644
--- a/requesting.go
+++ b/requesting.go
@@ -4,6 +4,7 @@ import (
 	"container/heap"
 	"context"
 	"encoding/gob"
+	"math/rand"
 	"reflect"
 	"runtime/pprof"
 	"time"
@@ -195,9 +196,9 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
 			allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
 			rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
 				r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
-				//if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
+				// if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
 				//	return
-				//}
+				// }
 				if !allowedFast {
 					// We must signal interest to request this
 					desired.Interested = true
@@ -257,7 +258,10 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
 	if !more {
 		return false
 	}
-	for _, req := range next.Requests {
+	shuffled := false
+	lastPending := 0
+	for i := 0; i < len(next.Requests); i++ {
+		req := next.Requests[i]
 		if p.cancelledRequests.Contains(req) {
 			// Waiting for a reject or piece message, which will suitably trigger us to update our
 			// requests, so we can skip this one with no additional consideration.
@@ -269,14 +273,38 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
 		// extra outstanding requests. We could subtract the number of outstanding cancels from the
 		// next request cardinality, but peers might not like that.
 		if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
-			//log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
+			// log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
 			//	next.Requests.GetCardinality(),
 			//	p.cancelledRequests.GetCardinality(),
 			//	current.Requests.GetCardinality(),
 			//	p.nominalMaxRequests(),
-			//)
+			// )
 			break
 		}
+		otherPending := p.t.pendingRequests.Get(next.Requests[0])
+		if p.actualRequestState.Requests.Contains(next.Requests[0]) {
+			otherPending--
+		}
+		if otherPending < lastPending {
+			// Pending should only rise. It's supposed to be the strongest ordering criteria. If it
+			// doesn't, our shuffling condition could be wrong.
+			panic(lastPending)
+		}
+		// If the request has already been requested by another peer, shuffle this and the rest of
+		// the requests (since according to the increasing condition, the rest of the indices
+		// already have an outstanding request with another peer).
+		if !shuffled && otherPending > 0 {
+			shuffleReqs := next.Requests[i:]
+			rand.Shuffle(len(shuffleReqs), func(i, j int) {
+				shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
+			})
+			// log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
+			shuffled = true
+			// Repeat this index
+			i--
+			continue
+		}
+
 		more = p.mustRequest(req)
 		if !more {
 			break
diff --git a/requesting_test.go b/requesting_test.go
index dd54df5d..5e58df5c 100644
--- a/requesting_test.go
+++ b/requesting_test.go
@@ -4,6 +4,7 @@ import (
 	"testing"
 
 	pp "github.com/anacrolix/torrent/peer_protocol"
+	"github.com/bradfitz/iter"
 	qt "github.com/frankban/quicktest"
 )
 
@@ -40,3 +41,37 @@ func TestRequestMapOrderAcrossInstances(t *testing.T) {
 	// This shows that different map instances with the same contents can have the same range order.
 	qt.Assert(t, keysAsSlice(makeTypicalRequests()), qt.ContentEquals, keysAsSlice(makeTypicalRequests()))
 }
+
+// Added for testing repeating loop iteration after shuffling in Peer.applyRequestState.
+func TestForLoopRepeatItem(t *testing.T) {
+	t.Run("ExplicitLoopVar", func(t *testing.T) {
+		once := false
+		var seen []int
+		for i := 0; i < 4; i++ {
+			seen = append(seen, i)
+			if !once && i == 2 {
+				once = true
+				i--
+				// Will i++ still run?
+				continue
+			}
+		}
+		// We can mutate i and it's observed by the loop. No special treatment of the loop var.
+		qt.Assert(t, seen, qt.DeepEquals, []int{0, 1, 2, 2, 3})
+	})
+	t.Run("Range", func(t *testing.T) {
+		once := false
+		var seen []int
+		for i := range iter.N(4) {
+			seen = append(seen, i)
+			if !once && i == 2 {
+				once = true
+				// Can we actually modify the next value of i produced by the range?
+				i--
+				continue
+			}
+		}
+		// Range ignores any mutation to i.
+		qt.Assert(t, seen, qt.DeepEquals, []int{0, 1, 2, 3})
+	})
+}