From 63b0e42731c61b34d2172354424cd7892a836320 Mon Sep 17 00:00:00 2001
From: Matt Joiner <anacrolix@gmail.com>
Date: Fri, 14 May 2021 13:06:12 +1000
Subject: [PATCH] Get max unverified bytes working

---
 request-strategy.go            |   8 +-
 request-strategy/order.go      | 149 ++++++++++++++++++---------------
 request-strategy/order_test.go |   8 +-
 request-strategy/piece.go      |   6 +-
 4 files changed, 92 insertions(+), 79 deletions(-)

diff --git a/request-strategy.go b/request-strategy.go
index 84473b40..8d8b3b23 100644
--- a/request-strategy.go
+++ b/request-strategy.go
@@ -24,11 +24,11 @@ func (cl *Client) requester() {
 }
 
 func (cl *Client) doRequests() {
-	ts := make([]*request_strategy.Torrent, 0, len(cl.torrents))
+	ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
 	for _, t := range cl.torrents {
-		rst := &request_strategy.Torrent{
-			StableId: uintptr(unsafe.Pointer(t)),
-			//MaxUnverifiedBytes: 1 << 20,
+		rst := request_strategy.Torrent{
+			StableId:           uintptr(unsafe.Pointer(t)),
+			MaxUnverifiedBytes: 10 << 20,
 		}
 		if t.storage != nil {
 			rst.Capacity = t.storage.Capacity
diff --git a/request-strategy/order.go b/request-strategy/order.go
index 4742254d..9052fb0c 100644
--- a/request-strategy/order.go
+++ b/request-strategy/order.go
@@ -16,46 +16,31 @@ type (
 	ChunkSpec = types.ChunkSpec
 )
 
-type ClientPieceOrder struct {
-	pieces []pieceRequestOrderPiece
-}
+type ClientPieceOrder struct{}
 
-type orderTorrent struct {
-	*Torrent
+type filterTorrent struct {
+	Torrent
 	unverifiedBytes int64
 	// Potentially shared with other torrents.
 	storageLeft *int64
-	peers       []*requestsPeer
-}
-
-type pieceRequestOrderPiece struct {
-	t     *orderTorrent
-	index pieceIndex
-	Piece
 }
 
-func (me *ClientPieceOrder) Len() int {
-	return len(me.pieces)
-}
-
-func (me ClientPieceOrder) sort() {
-	sort.Slice(me.pieces, me.less)
-}
-
-func (me ClientPieceOrder) less(_i, _j int) bool {
-	i := me.pieces[_i]
-	j := me.pieces[_j]
-	return multiless.New().Int(
-		int(j.Priority), int(i.Priority),
-	).Bool(
-		j.Partial, i.Partial,
-	).Int64(
-		i.Availability, j.Availability,
-	).Int(
-		i.index, j.index,
-	).Uintptr(
-		i.t.StableId, j.t.StableId,
-	).MustLess()
+func sortFilterPieces(pieces []filterPiece) {
+	sort.Slice(pieces, func(_i, _j int) bool {
+		i := pieces[_i]
+		j := pieces[_j]
+		return multiless.New().Int(
+			int(j.Priority), int(i.Priority),
+		).Bool(
+			j.Partial, i.Partial,
+		).Int64(
+			i.Availability, j.Availability,
+		).Int(
+			i.index, j.index,
+		).Uintptr(
+			i.t.StableId, j.t.StableId,
+		).MustLess()
+	})
 }
 
 type requestsPeer struct {
@@ -86,15 +71,27 @@ func (me *peersForPieceRequests) addNextRequest(r Request) {
 	me.requestsInPiece++
 }
 
-func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
-	requestOrder.pieces = requestOrder.pieces[:0]
+type requestablePiece struct {
+	index             pieceIndex
+	t                 Torrent
+	NumPendingChunks  int
+	IterPendingChunks ChunksIter
+}
+
+type filterPiece struct {
+	t     *filterTorrent
+	index pieceIndex
+	Piece
+}
+
+func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) {
 	// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
 	// TorrentImpl.
 	storageLeft := make(map[*func() *int64]*int64)
-	orderTorrents := make([]*orderTorrent, 0, len(torrents))
+	var pieces []filterPiece
 	for _, _t := range torrents {
 		// TODO: We could do metainfo requests here.
-		t := &orderTorrent{
+		t := &filterTorrent{
 			Torrent:         _t,
 			unverifiedBytes: 0,
 		}
@@ -105,34 +102,16 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
 			}
 			t.storageLeft = storageLeft[key]
 		}
-		var peers []*requestsPeer
-		for _, p := range t.Peers {
-			peers = append(peers, &requestsPeer{
-				Peer: p,
-				nextState: PeerNextRequestState{
-					Requests: make(map[Request]struct{}),
-				},
-			})
-		}
 		for i, tp := range t.Pieces {
-			requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
+			pieces = append(pieces, filterPiece{
 				t:     t,
 				index: i,
 				Piece: tp,
 			})
-			if tp.Request && tp.NumPendingChunks != 0 {
-				for _, p := range peers {
-					if p.canRequestPiece(i) {
-						p.requestablePiecesRemaining++
-					}
-				}
-			}
 		}
-		t.peers = peers
-		orderTorrents = append(orderTorrents, t)
 	}
-	requestOrder.sort()
-	for _, piece := range requestOrder.pieces {
+	sortFilterPieces(pieces)
+	for _, piece := range pieces {
 		if left := piece.t.storageLeft; left != nil {
 			if *left < int64(piece.Length) {
 				continue
@@ -143,16 +122,48 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
 			continue
 		}
 		if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
-			//log.Print("skipping piece")
 			continue
 		}
-		allocatePendingChunks(piece, piece.t.peers)
 		piece.t.unverifiedBytes += piece.Length
-		//log.Print(piece.t.unverifiedBytes)
+		ret = append(ret, requestablePiece{
+			index:             piece.index,
+			t:                 piece.t.Torrent,
+			NumPendingChunks:  piece.NumPendingChunks,
+			IterPendingChunks: piece.iterPendingChunksWrapper,
+		})
+	}
+	return
+}
+
+// TODO: We could do metainfo requests here.
+func (requestOrder *ClientPieceOrder) DoRequests(torrents []Torrent) map[PeerId]PeerNextRequestState {
+	requestPieces := getRequestablePieces(torrents)
+	allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
+	for _, t := range torrents {
+		peers := make([]*requestsPeer, 0, len(t.Peers))
+		for _, p := range t.Peers {
+			peers = append(peers, &requestsPeer{
+				Peer: p,
+				nextState: PeerNextRequestState{
+					Requests: make(map[Request]struct{}),
+				},
+			})
+		}
+		allPeers[t.StableId] = peers
+	}
+	for _, piece := range requestPieces {
+		for _, peer := range allPeers[piece.t.StableId] {
+			if peer.canRequestPiece(piece.index) {
+				peer.requestablePiecesRemaining++
+			}
+		}
+	}
+	for _, piece := range requestPieces {
+		allocatePendingChunks(piece, allPeers[piece.t.StableId])
 	}
 	ret := make(map[PeerId]PeerNextRequestState)
-	for _, ots := range orderTorrents {
-		for _, rp := range ots.peers {
+	for _, peers := range allPeers {
+		for _, rp := range peers {
 			if rp.requestablePiecesRemaining != 0 {
 				panic(rp.requestablePiecesRemaining)
 			}
@@ -162,7 +173,7 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
 	return ret
 }
 
-func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
+func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
 	peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
 	for _, peer := range peers {
 		peersForPiece = append(peersForPiece, &peersForPieceRequests{
@@ -204,7 +215,7 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
 		})
 	}
 	preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
-	p.iterPendingChunksWrapper(func(spec ChunkSpec) {
+	p.IterPendingChunks(func(spec ChunkSpec) {
 		req := Request{pp.Integer(p.index), spec}
 		for _, peer := range peersForPiece {
 			if h := peer.HasExistingRequest; h == nil || !h(req) {
@@ -221,7 +232,7 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
 		}
 	})
 	pendingChunksRemaining := int(p.NumPendingChunks)
-	p.iterPendingChunksWrapper(func(chunk types.ChunkSpec) {
+	p.IterPendingChunks(func(chunk types.ChunkSpec) {
 		if _, ok := preallocated[chunk]; ok {
 			return
 		}
@@ -243,11 +254,12 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
 				}
 			}
 			peer.addNextRequest(req)
-			return
+			break
 		}
 	})
 chunk:
 	for chunk, prePeer := range preallocated {
+		pendingChunksRemaining--
 		req := Request{pp.Integer(p.index), chunk}
 		prePeer.requestsInPiece--
 		sortPeersForPiece(&req)
@@ -266,7 +278,6 @@ chunk:
 					continue
 				}
 			}
-			pendingChunksRemaining--
 			peer.addNextRequest(req)
 			continue chunk
 		}
diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go
index 8c518b50..fd8b53f0 100644
--- a/request-strategy/order_test.go
+++ b/request-strategy/order_test.go
@@ -64,7 +64,7 @@ func TestStealingFromSlowerPeer(t *testing.T) {
 	firstStealer.Id = intPeerId(2)
 	secondStealer := basePeer
 	secondStealer.Id = intPeerId(3)
-	results := order.DoRequests([]*Torrent{{
+	results := order.DoRequests([]Torrent{{
 		Pieces: []Piece{{
 			Request:           true,
 			NumPendingChunks:  5,
@@ -111,7 +111,7 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) {
 	firstStealer.Id = intPeerId(2)
 	secondStealer := basePeer
 	secondStealer.Id = intPeerId(3)
-	results := order.DoRequests([]*Torrent{{
+	results := order.DoRequests([]Torrent{{
 		Pieces: []Piece{{
 			Request:           true,
 			NumPendingChunks:  2,
@@ -150,7 +150,7 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
 	firstStealer.Id = intPeerId(2)
 	secondStealer := basePeer
 	secondStealer.Id = intPeerId(3)
-	results := order.DoRequests([]*Torrent{{
+	results := order.DoRequests([]Torrent{{
 		Pieces: []Piece{{
 			Request:           true,
 			NumPendingChunks:  4,
@@ -198,7 +198,7 @@ func TestDontStealUnnecessarily(t *testing.T) {
 	firstStealer.Id = intPeerId(2)
 	secondStealer := basePeer
 	secondStealer.Id = intPeerId(3)
-	results := order.DoRequests([]*Torrent{{
+	results := order.DoRequests([]Torrent{{
 		Pieces: []Piece{{
 			Request:           true,
 			NumPendingChunks:  9,
diff --git a/request-strategy/piece.go b/request-strategy/piece.go
index 508ed829..bc59c052 100644
--- a/request-strategy/piece.go
+++ b/request-strategy/piece.go
@@ -4,6 +4,8 @@ import (
 	"github.com/anacrolix/torrent/types"
 )
 
+type ChunksIter func(func(types.ChunkSpec))
+
 type Piece struct {
 	Request           bool
 	Priority          piecePriority
@@ -11,10 +13,10 @@ type Piece struct {
 	Availability      int64
 	Length            int64
 	NumPendingChunks  int
-	IterPendingChunks func(func(types.ChunkSpec))
+	IterPendingChunks ChunksIter
 }
 
-func (p *Piece) iterPendingChunksWrapper(f func(ChunkSpec)) {
+func (p Piece) iterPendingChunksWrapper(f func(ChunkSpec)) {
 	i := p.IterPendingChunks
 	if i != nil {
 		i(f)
-- 
2.51.0