From: Matt Joiner <anacrolix@gmail.com>
Date: Wed, 1 Dec 2021 03:38:47 +0000 (+1100)
Subject: Implement piece request ordering with retained state
X-Git-Tag: v1.39.0^2~15
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=94bb5d40ba35c977755adadf3dcf754c783e6a11;p=btrtrc.git

Implement piece request ordering with retained state
---

diff --git a/client.go b/client.go
index c491118b..8a686767 100644
--- a/client.go
+++ b/client.go
@@ -27,6 +27,7 @@ import (
 	"github.com/anacrolix/missinggo/v2/bitmap"
 	"github.com/anacrolix/missinggo/v2/pproffd"
 	"github.com/anacrolix/sync"
+	request_strategy "github.com/anacrolix/torrent/request-strategy"
 	"github.com/davecgh/go-spew/spew"
 	"github.com/dustin/go-humanize"
 	"github.com/google/btree"
@@ -74,6 +75,7 @@ type Client struct {
 	dopplegangerAddrs map[string]struct{}
 	badPeerIPs        map[string]struct{}
 	torrents          map[InfoHash]*Torrent
+	pieceRequestOrder map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder
 
 	acceptLimiter   map[ipStr]int
 	dialRateLimiter *rate.Limiter
diff --git a/piece.go b/piece.go
index bef5f59e..6caa7628 100644
--- a/piece.go
+++ b/piece.go
@@ -77,11 +77,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType {
 
 func (p *Piece) unpendChunkIndex(i chunkIndexType) {
 	p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
+	p.t.updatePieceRequestOrder(p.index)
 	p.readerCond.Broadcast()
 }
 
 func (p *Piece) pendChunkIndex(i RequestIndex) {
 	p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
+	p.t.updatePieceRequestOrder(p.index)
 }
 
 func (p *Piece) numChunks() chunkIndexType {
diff --git a/request-strategy/order.go b/request-strategy/order.go
index 91dfc74d..cacd7f96 100644
--- a/request-strategy/order.go
+++ b/request-strategy/order.go
@@ -3,12 +3,14 @@ package request_strategy
 import (
 	"bytes"
 	"expvar"
+	"log"
 	"runtime"
 	"sort"
 	"sync"
 
 	"github.com/anacrolix/multiless"
 	"github.com/anacrolix/torrent/metainfo"
+	"github.com/google/btree"
 
 	"github.com/anacrolix/torrent/types"
 )
@@ -60,6 +62,15 @@ func (me pieceSorter) Swap(i, j int) {
 func (me pieceSorter) Less(_i, _j int) bool {
 	i := me.get(_i)
 	j := me.get(_j)
+	return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess()
+}
+
+type pieceOrderInput struct {
+	PieceRequestOrderState
+	PieceRequestOrderKey
+}
+
+func pieceOrderLess(i, j pieceOrderInput) multiless.Computation {
 	return multiless.New().Int(
 		int(j.Priority), int(i.Priority),
 	).Bool(
@@ -67,13 +78,13 @@ func (me pieceSorter) Less(_i, _j int) bool {
 	).Int64(
 		i.Availability, j.Availability,
 	).Int(
-		i.index, j.index,
+		i.Index, j.Index,
 	).Lazy(func() multiless.Computation {
 		return multiless.New().Cmp(bytes.Compare(
-			i.t.InfoHash[:],
-			j.t.InfoHash[:],
+			i.InfoHash[:],
+			j.InfoHash[:],
 		))
-	}).MustLess()
+	})
 }
 
 type requestsPeer struct {
@@ -120,6 +131,15 @@ type filterPiece struct {
 	*Piece
 }
 
+func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) {
+	ret.Partial = fp.Partial
+	ret.InfoHash = fp.t.InfoHash
+	ret.Availability = fp.Availability
+	ret.Priority = fp.Priority
+	ret.Index = fp.index
+	return
+}
+
 var (
 	sortsMu sync.Mutex
 	sorts   = map[*[]filterPiece][]int{}
@@ -186,12 +206,45 @@ type pieceOrderingFinalizer struct {
 }
 
 // Calls f with requestable pieces in order.
-func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
-	maxPieces := 0
-	for i := range input.Torrents {
-		maxPieces += len(input.Torrents[i].Pieces)
+func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) {
+	if false {
+		maxPieces := 0
+		for i := range input.Torrents {
+			maxPieces += len(input.Torrents[i].Pieces)
+		}
+		pieces := make([]filterPiece, 0, maxPieces)
+		for _t := range input.Torrents {
+			// TODO: We could do metainfo requests here.
+			t := &input.Torrents[_t]
+			for i := range t.Pieces {
+				pieces = append(pieces, filterPiece{
+					t:     &input.Torrents[_t],
+					index: i,
+					Piece: &t.Pieces[i],
+				})
+			}
+		}
+		pieces = getSortedFilterPieces(pieces)
+		{
+			if len(pieces) != pro.tree.Len() {
+				panic("length doesn't match")
+			}
+			pieces := pieces
+			pro.tree.Ascend(func(i btree.Item) bool {
+				_i := i.(pieceRequestOrderItem)
+				ii := pieceOrderInput{
+					_i.state,
+					_i.key,
+				}
+				if pieces[0].toPieceOrderInput() != ii {
+					panic(_i)
+				}
+				pieces = pieces[1:]
+				return true
+			})
+		}
+		log.Printf("%v pieces passed", len(pieces))
 	}
-	pieces := make([]filterPiece, 0, maxPieces)
 	// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
 	// TorrentImpl. A nil value means no capacity limit.
 	var storageLeft *int64
@@ -199,42 +252,41 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i
 		storageLeft = new(int64)
 		*storageLeft = *input.Capacity
 	}
-	for _t := range input.Torrents {
-		// TODO: We could do metainfo requests here.
-		t := &input.Torrents[_t]
-		for i := range t.Pieces {
-			pieces = append(pieces, filterPiece{
-				t:     &input.Torrents[_t],
-				index: i,
-				Piece: &t.Pieces[i],
-			})
-		}
-	}
-	pieces = getSortedFilterPieces(pieces)
 	var allTorrentsUnverifiedBytes int64
 	torrentUnverifiedBytes := map[metainfo.Hash]int64{}
-	for _, piece := range pieces {
+	pro.tree.Ascend(func(i btree.Item) bool {
+		_i := i.(pieceRequestOrderItem)
+		var piece *Piece
+		var t Torrent
+		for _, t = range input.Torrents {
+			if t.InfoHash == _i.key.InfoHash {
+				piece = &t.Pieces[_i.key.Index]
+				break
+			}
+		}
 		if left := storageLeft; left != nil {
 			if *left < piece.Length {
-				continue
+				return true
 			}
 			*left -= piece.Length
 		}
 		if !piece.Request || piece.NumPendingChunks == 0 {
 			// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
 			// considered unverified and hold up further requests.
-			continue
+
+			return true
 		}
-		if piece.t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[piece.t.InfoHash]+piece.Length > piece.t.MaxUnverifiedBytes {
-			continue
+		if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes {
+			return true
 		}
 		if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
-			continue
+			return true
 		}
-		torrentUnverifiedBytes[piece.t.InfoHash] += piece.Length
+		torrentUnverifiedBytes[t.InfoHash] += piece.Length
 		allTorrentsUnverifiedBytes += piece.Length
-		f(piece.t, piece.Piece, piece.index)
-	}
+		f(&t, piece, _i.key.Index)
+		return true
+	})
 	return
 }
 
diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go
new file mode 100644
index 00000000..efe75604
--- /dev/null
+++ b/request-strategy/piece-request-order.go
@@ -0,0 +1,88 @@
+package request_strategy
+
+import (
+	"github.com/anacrolix/torrent/metainfo"
+	"github.com/google/btree"
+)
+
+func NewPieceOrder() *PieceRequestOrder {
+	return &PieceRequestOrder{
+		tree: btree.New(32),
+		keys: make(map[PieceRequestOrderKey]PieceRequestOrderState),
+	}
+}
+
+type PieceRequestOrder struct {
+	tree *btree.BTree
+	keys map[PieceRequestOrderKey]PieceRequestOrderState
+}
+
+type PieceRequestOrderKey struct {
+	InfoHash metainfo.Hash
+	Index    int
+}
+
+type PieceRequestOrderState struct {
+	Priority     piecePriority
+	Partial      bool
+	Availability int64
+}
+
+type pieceRequestOrderItem struct {
+	key   PieceRequestOrderKey
+	state PieceRequestOrderState
+}
+
+func (me pieceRequestOrderItem) Less(other btree.Item) bool {
+	otherConcrete := other.(pieceRequestOrderItem)
+	return pieceOrderLess(
+		pieceOrderInput{
+			PieceRequestOrderState: me.state,
+			PieceRequestOrderKey:   me.key,
+		},
+		pieceOrderInput{
+			PieceRequestOrderState: otherConcrete.state,
+			PieceRequestOrderKey:   otherConcrete.key,
+		},
+	).Less()
+}
+
+func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) {
+	if _, ok := me.keys[key]; ok {
+		panic(key)
+	}
+	if me.tree.ReplaceOrInsert(pieceRequestOrderItem{
+		key:   key,
+		state: state,
+	}) != nil {
+		panic("shouldn't already have this")
+	}
+	me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) {
+	if me.tree.Delete(me.existingItemForKey(key)) == nil {
+		panic(key)
+	}
+	if me.tree.ReplaceOrInsert(pieceRequestOrderItem{
+		key:   key,
+		state: state,
+	}) != nil {
+		panic(key)
+	}
+	me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem {
+	return pieceRequestOrderItem{
+		key:   key,
+		state: me.keys[key],
+	}
+}
+
+func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
+	if me.tree.Delete(me.existingItemForKey(key)) == nil {
+		panic(key)
+	}
+	delete(me.keys, key)
+}
diff --git a/requesting.go b/requesting.go
index 5ad98ea8..147e574d 100644
--- a/requesting.go
+++ b/requesting.go
@@ -27,9 +27,11 @@ func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input reques
 			input.Capacity = &cap
 		}
 	}
-	if input.Capacity == nil {
-		input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
-		return
+	if false {
+		if input.Capacity == nil {
+			input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
+			return
+		}
 	}
 	input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents))
 	for _, t := range cl.torrents {
@@ -58,20 +60,32 @@ func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
 	}
 	rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
 	for i := range t.pieces {
-		p := &t.pieces[i]
-		rst.Pieces = append(rst.Pieces, request_strategy.Piece{
-			Request:           !t.ignorePieceForRequests(i),
-			Priority:          p.purePriority(),
-			Partial:           t.piecePartiallyDownloaded(i),
-			Availability:      p.availability,
-			Length:            int64(p.length()),
-			NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
-			IterPendingChunks: &p.undirtiedChunksIter,
-		})
+		rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i))
 	}
 	return rst
 }
 
+func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
+	return request_strategy.PieceRequestOrderState{
+		Priority:     t.piece(i).purePriority(),
+		Partial:      t.piecePartiallyDownloaded(i),
+		Availability: t.piece(i).availability,
+	}
+}
+
+func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece {
+	p := &t.pieces[i]
+	return request_strategy.Piece{
+		Request:           !t.ignorePieceForRequests(i),
+		Priority:          p.purePriority(),
+		Partial:           t.piecePartiallyDownloaded(i),
+		Availability:      p.availability,
+		Length:            int64(p.length()),
+		NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
+		IterPendingChunks: &p.undirtiedChunksIter,
+	}
+}
+
 func init() {
 	gob.Register(peerId{})
 }
@@ -205,6 +219,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
 	}
 	request_strategy.GetRequestablePieces(
 		input,
+		p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
 		func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
 			if t.InfoHash != p.t.infoHash {
 				return
diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go
new file mode 100644
index 00000000..e1ab9670
--- /dev/null
+++ b/torrent-piece-request-order.go
@@ -0,0 +1,7 @@
+package torrent
+
+func (t *Torrent) updatePieceRequestOrder(pieceIndex int) {
+	t.cl.pieceRequestOrder[t.storage.Capacity].Update(
+		t.pieceRequestOrderKey(pieceIndex),
+		t.requestStrategyPieceOrderState(pieceIndex))
+}
diff --git a/torrent.go b/torrent.go
index 7ccf13f6..c4ecf07f 100644
--- a/torrent.go
+++ b/torrent.go
@@ -27,6 +27,7 @@ import (
 	"github.com/anacrolix/missinggo/v2/bitmap"
 	"github.com/anacrolix/multiless"
 	"github.com/anacrolix/sync"
+	request_strategy "github.com/anacrolix/torrent/request-strategy"
 	"github.com/davecgh/go-spew/spew"
 	"github.com/pion/datachannel"
 
@@ -165,6 +166,7 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) {
 		panic(p.availability)
 	}
 	p.availability--
+	t.updatePieceRequestOrder(i)
 }
 
 func (t *Torrent) incPieceAvailability(i pieceIndex) {
@@ -172,6 +174,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) {
 	if t.haveInfo() {
 		p := t.piece(i)
 		p.availability++
+		t.updatePieceRequestOrder(i)
 	}
 }
 
@@ -424,8 +427,21 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
 	return nil
 }
 
+func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
+	return request_strategy.PieceRequestOrderKey{
+		InfoHash: t.infoHash,
+		Index:    i,
+	}
+}
+
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
+	if t.cl.pieceRequestOrder == nil {
+		t.cl.pieceRequestOrder = make(map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder)
+	}
+	if t.cl.pieceRequestOrder[t.storage.Capacity] == nil {
+		t.cl.pieceRequestOrder[t.storage.Capacity] = request_strategy.NewPieceOrder()
+	}
 	for i := range t.pieces {
 		p := &t.pieces[i]
 		// Need to add availability before updating piece completion, as that may result in conns
@@ -434,6 +450,9 @@ func (t *Torrent) onSetInfo() {
 			panic(p.availability)
 		}
 		p.availability = int64(t.pieceAvailabilityFromPeers(i))
+		t.cl.pieceRequestOrder[t.storage.Capacity].Add(
+			t.pieceRequestOrderKey(i),
+			t.requestStrategyPieceOrderState(i))
 		t.updatePieceCompletion(pieceIndex(i))
 		if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
 			// t.logger.Printf("piece %s completion unknown, queueing check", p)
@@ -797,6 +816,12 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
 	return pieceIndex(t._completedPieces.GetCardinality())
 }
 
+func (t *Torrent) deletePieceRequestOrder() {
+	for i := 0; i < t.numPieces(); i++ {
+		t.cl.pieceRequestOrder[t.storage.Capacity].Delete(t.pieceRequestOrderKey(i))
+	}
+}
+
 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
 	t.closed.Set()
 	if t.storage != nil {
@@ -816,6 +841,9 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
 	t.iterPeers(func(p *Peer) {
 		p.close()
 	})
+	if t.storage != nil {
+		t.deletePieceRequestOrder()
+	}
 	t.pex.Reset()
 	t.cl.event.Broadcast()
 	t.pieceStateChanges.Close()
@@ -1102,6 +1130,11 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
 }
 
 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
+	if !t.closed.IsSet() {
+		// It would be possible to filter on pure-priority changes here to avoid churning the piece
+		// request order.
+		t.updatePieceRequestOrder(piece)
+	}
 	p := &t.pieces[piece]
 	newPrio := p.uncachedPriority()
 	// t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
@@ -1238,6 +1271,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
 	} else {
 		t._completedPieces.Remove(x)
 	}
+	p.t.updatePieceRequestOrder(piece)
 	t.updateComplete()
 	if complete && len(p.dirtiers) != 0 {
 		t.logger.Printf("marked piece %v complete but still has dirtiers", piece)