From 27b74769ed0111f2d15f5e436b2e42a5303f2bf0 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 16 Jul 2025 14:59:18 +1000 Subject: [PATCH] Claude moved PeerConn methods from peer.go to peerconn.go --- peer.go | 310 --------------------------------------------------- peerconn.go | 313 +++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 312 insertions(+), 311 deletions(-) diff --git a/peer.go b/peer.go index 7a3b10e1..6bcddd49 100644 --- a/peer.go +++ b/peer.go @@ -1,7 +1,6 @@ package torrent import ( - "errors" "fmt" "io" "log/slog" @@ -19,7 +18,6 @@ import ( "github.com/anacrolix/multiless" "github.com/anacrolix/torrent/internal/alloclim" - requestStrategy "github.com/anacrolix/torrent/internal/request-strategy" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" typedRoaring "github.com/anacrolix/torrent/typed-roaring" @@ -146,10 +144,6 @@ func (p *Peer) Stats() (ret PeerStats) { return } -func (p *PeerConn) initRequestState() { - p.requestState.Requests = &peerRequests{} -} - func (cn *Peer) updateExpectingChunks() { if cn.peerImpl.expectingChunks() { if cn.lastStartedExpectingToReceiveChunks.IsZero() { @@ -163,45 +157,10 @@ func (cn *Peer) updateExpectingChunks() { } } -func (cn *PeerConn) expectingChunks() bool { - if cn.requestState.Requests.IsEmpty() { - return false - } - if !cn.requestState.Interested { - return false - } - if !cn.peerChoking { - return true - } - haveAllowedFastRequests := false - cn.peerAllowedFast.Iterate(func(i pieceIndex) bool { - haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex]( - cn.requestState.Requests, - cn.t.pieceRequestIndexBegin(i), - cn.t.pieceRequestIndexBegin(i+1), - ) == 0 - return !haveAllowedFastRequests - }) - return haveAllowedFastRequests -} - -func (cn *PeerConn) cumInterest() time.Duration { - ret := cn.priorInterest - if cn.requestState.Interested { - ret += time.Since(cn.lastBecameInterested) - } - return ret -} - func (cn *Peer) locker() *lockWithDeferreds { return cn.t.cl.locker() } -func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool { - _, ok := cn.PeerExtensionIDs[ext] - return ok -} - // The best guess at number of pieces in the torrent for this peer. func (cn *Peer) bestPeerNumPieces() pieceIndex { if cn.t.haveInfo() { @@ -230,29 +189,6 @@ func eventAgeString(t time.Time) string { return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds()) } -// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text. -func (cn *PeerConn) statusFlags() (ret string) { - c := func(b byte) { - ret += string([]byte{b}) - } - if cn.requestState.Interested { - c('i') - } - if cn.choking { - c('c') - } - c(':') - ret += cn.connectionFlags() - c(':') - if cn.peerInterested { - c('i') - } - if cn.peerChoking { - c('c') - } - return -} - func (cn *Peer) downloadRate() float64 { num := cn._stats.BytesReadUsefulData.Int64() if num == 0 { @@ -266,52 +202,6 @@ func (p *Peer) DownloadRate() float64 { return p.Stats().DownloadRate } -func (cn *PeerConn) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) { - var last Option[pieceIndex] - var count int - next := func(item Option[pieceIndex]) { - if item == last { - count++ - } else { - if count != 0 { - f(last.Value, count) - } - last = item - count = 1 - } - } - cn.requestState.Requests.Iterate(func(requestIndex requestStrategy.RequestIndex) bool { - next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex))) - return true - }) - next(None[pieceIndex]()) -} - -func (cn *PeerConn) peerImplWriteStatus(w io.Writer) { - prio, err := cn.peerPriority() - prioStr := fmt.Sprintf("%08x", prio) - if err != nil { - prioStr += ": " + err.Error() - } - fmt.Fprintf(w, "bep40-prio: %v\n", prioStr) - fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n", - eventAgeString(cn.lastMessageReceived), - eventAgeString(cn.completedHandshake), - eventAgeString(cn.lastHelpful()), - cn.cumInterest(), - cn.totalExpectingTime(), - ) - fmt.Fprintf(w, - "%s completed, chunks uploaded: %v\n", - cn.completedString(), - &cn._stats.ChunksWritten, - ) - fmt.Fprintf(w, "requested pieces:") - cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) { - fmt.Fprintf(w, " %v(%v)", piece, count) - }) -} - func (cn *Peer) writeStatus(w io.Writer) { // \t isn't preserved in
 blocks?
 	if cn.closed.IsSet() {
@@ -387,121 +277,10 @@ func (cn *Peer) totalExpectingTime() (ret time.Duration) {
 	return
 }
 
-func (cn *PeerConn) setInterested(interested bool) bool {
-	if cn.requestState.Interested == interested {
-		return true
-	}
-	cn.requestState.Interested = interested
-	if interested {
-		cn.lastBecameInterested = time.Now()
-	} else if !cn.lastBecameInterested.IsZero() {
-		cn.priorInterest += time.Since(cn.lastBecameInterested)
-	}
-	cn.updateExpectingChunks()
-	// log.Printf("%p: setting interest: %v", cn, interested)
-	return cn.writeInterested(interested)
-}
-
 // The function takes a message to be sent, and returns true if more messages
 // are okay.
 type messageWriter func(pp.Message) bool
 
-// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
-// when we want to go fast.
-func (cn *PeerConn) shouldRequest(r RequestIndex) error {
-	err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
-	if err != nil {
-		return err
-	}
-	pi := cn.t.pieceIndexOfRequestIndex(r)
-	if cn.requestState.Cancelled.Contains(r) {
-		return errors.New("request is cancelled and waiting acknowledgement")
-	}
-	if !cn.peerHasPiece(pi) {
-		return errors.New("requesting piece peer doesn't have")
-	}
-	if !cn.t.peerIsActive(cn.peerPtr()) {
-		panic("requesting but not in active conns")
-	}
-	if cn.closed.IsSet() {
-		panic("requesting when connection is closed")
-	}
-	if cn.t.hashingPiece(pi) {
-		panic("piece is being hashed")
-	}
-	p := cn.t.piece(pi)
-	if p.marking {
-		panic("piece is being marked")
-	}
-	if cn.t.pieceQueuedForHash(pi) {
-		panic("piece is queued for hash")
-	}
-	if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
-		// This could occur if we made a request with the fast extension, and then got choked and
-		// haven't had the request rejected yet.
-		if !cn.requestState.Requests.Contains(r) {
-			panic("peer choking and piece not allowed fast")
-		}
-	}
-	return nil
-}
-
-func (cn *PeerConn) mustRequest(r RequestIndex) bool {
-	more, err := cn.request(r)
-	if err != nil {
-		panic(err)
-	}
-	return more
-}
-
-func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
-	if err := cn.shouldRequest(r); err != nil {
-		panic(err)
-	}
-	if cn.requestState.Requests.Contains(r) {
-		return true, nil
-	}
-	if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
-		return true, errors.New("too many outstanding requests")
-	}
-	cn.requestState.Requests.Add(r)
-	if cn.validReceiveChunks == nil {
-		cn.validReceiveChunks = make(map[RequestIndex]int)
-	}
-	cn.validReceiveChunks[r]++
-	cn.t.requestState[r] = requestState{
-		peer: cn,
-		when: time.Now(),
-	}
-	cn.updateExpectingChunks()
-	ppReq := cn.t.requestIndexToRequest(r)
-	for _, f := range cn.callbacks.SentRequest {
-		f(PeerRequestEvent{cn.peerPtr(), ppReq})
-	}
-	return cn._request(ppReq), nil
-}
-
-func (me *PeerConn) cancel(r RequestIndex) {
-	if !me.deleteRequest(r) {
-		panic("request not existing should have been guarded")
-	}
-	me.handleCancel(r)
-	me.decPeakRequests()
-	if me.isLowOnRequests() {
-		me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
-	}
-}
-
-// Sets a reason to update requests, and if there wasn't already one, handle it.
-func (cn *PeerConn) onNeedUpdateRequests(reason updateRequestReason) {
-	if cn.needRequestUpdate != "" {
-		return
-	}
-	cn.needRequestUpdate = reason
-	// Run this before the Client lock is released.
-	cn.locker().DeferUniqueUnaryFunc(cn, cn.handleOnNeedUpdateRequests)
-}
-
 // Emits the indices in the Bitmaps bms in order, never repeating any index.
 // skip is mutated during execution, and its initial values will never be
 // emitted.
@@ -575,32 +354,6 @@ func runSafeExtraneous(f func()) {
 	}
 }
 
-// Returns true if it was valid to reject the request.
-func (c *PeerConn) remoteRejectedRequest(r RequestIndex) bool {
-	if c.deleteRequest(r) {
-		c.decPeakRequests()
-	} else if !c.requestState.Cancelled.CheckedRemove(r) {
-		// The request was already cancelled.
-		return false
-	}
-	if c.isLowOnRequests() {
-		c.onNeedUpdateRequests(peerUpdateRequestsRemoteRejectReason)
-	}
-	c.decExpectedChunkReceive(r)
-	return true
-}
-
-func (c *PeerConn) decExpectedChunkReceive(r RequestIndex) {
-	count := c.validReceiveChunks[r]
-	if count == 1 {
-		delete(c.validReceiveChunks, r)
-	} else if count > 1 {
-		c.validReceiveChunks[r] = count - 1
-	} else {
-		panic(r)
-	}
-}
-
 func (c *Peer) doChunkReadStats(size int64) {
 	c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) })
 }
@@ -746,61 +499,6 @@ func (c *Peer) peerHasWantedPieces() bool {
 	return c.peerPieces().Intersects(&c.t._pendingPieces)
 }
 
-// Returns true if an outstanding request is removed. Cancelled requests should be handled
-// separately.
-func (c *PeerConn) deleteRequest(r RequestIndex) bool {
-	if !c.requestState.Requests.CheckedRemove(r) {
-		return false
-	}
-	for _, f := range c.callbacks.DeletedRequest {
-		f(PeerRequestEvent{c.peerPtr(), c.t.requestIndexToRequest(r)})
-	}
-	c.updateExpectingChunks()
-	// TODO: Can't this happen if a request is stolen?
-	if c.t.requestingPeer(r) != c {
-		panic("only one peer should have a given request at a time")
-	}
-	delete(c.t.requestState, r)
-	// c.t.iterPeers(func(p *Peer) {
-	// 	if p.isLowOnRequests() {
-	// 		p.onNeedUpdateRequests("Peer.deleteRequest")
-	// 	}
-	// })
-	return true
-}
-
-func (c *PeerConn) deleteAllRequests(reason updateRequestReason) {
-	if c.requestState.Requests.IsEmpty() {
-		return
-	}
-	c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
-		if !c.deleteRequest(x) {
-			panic("request should exist")
-		}
-		return true
-	})
-	c.assertNoRequests()
-	c.t.iterPeers(func(p *Peer) {
-		if p.isLowOnRequests() {
-			p.onNeedUpdateRequests(reason)
-		}
-	})
-}
-
-func (c *PeerConn) assertNoRequests() {
-	if !c.requestState.Requests.IsEmpty() {
-		panic(c.requestState.Requests.GetCardinality())
-	}
-}
-
-func (c *PeerConn) cancelAllRequests() {
-	c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
-		c.cancel(x)
-		return true
-	})
-	c.assertNoRequests()
-}
-
 func (c *Peer) peerPriority() (peerPriority, error) {
 	return bep40Priority(c.remoteIpPort(), c.localPublicAddr)
 }
@@ -851,16 +549,8 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
 	return pc, ok
 }
 
-func (p *PeerConn) uncancelledRequests() uint64 {
-	return p.requestState.Requests.GetCardinality()
-}
-
 type peerLocalPublicAddr = IpPort
 
-func (p *PeerConn) isLowOnRequests() bool {
-	return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
-}
-
 func (p *Peer) decPeakRequests() {
 	// // This can occur when peak requests are altered by the update request timer to be lower than
 	// // the actual number of outstanding requests. Let's let it go negative and see what happens. I
diff --git a/peerconn.go b/peerconn.go
index a3e0a9d0..a8d466ea 100644
--- a/peerconn.go
+++ b/peerconn.go
@@ -22,11 +22,13 @@ import (
 	"github.com/anacrolix/log"
 	"github.com/anacrolix/missinggo/v2/bitmap"
 	"github.com/anacrolix/multiless"
-	requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
+
 	"golang.org/x/time/rate"
 
 	"github.com/anacrolix/torrent/bencode"
 	"github.com/anacrolix/torrent/internal/alloclim"
+	requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
+
 	"github.com/anacrolix/torrent/merkle"
 	"github.com/anacrolix/torrent/metainfo"
 	"github.com/anacrolix/torrent/mse"
@@ -1496,6 +1498,315 @@ func (me *PeerConn) setPeerLoggers(a log.Logger, s *slog.Logger) {
 	me.protocolLogger = me.logger.WithNames(protocolLoggingName)
 }
 
+// Methods moved from peer.go (in their original order):
+
+func (p *PeerConn) initRequestState() {
+	p.requestState.Requests = &peerRequests{}
+}
+
+func (cn *PeerConn) expectingChunks() bool {
+	if cn.requestState.Requests.IsEmpty() {
+		return false
+	}
+	if !cn.requestState.Interested {
+		return false
+	}
+	if !cn.peerChoking {
+		return true
+	}
+	haveAllowedFastRequests := false
+	cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
+		haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
+			cn.requestState.Requests,
+			cn.t.pieceRequestIndexBegin(i),
+			cn.t.pieceRequestIndexBegin(i+1),
+		) == 0
+		return !haveAllowedFastRequests
+	})
+	return haveAllowedFastRequests
+}
+
+func (cn *PeerConn) cumInterest() time.Duration {
+	ret := cn.priorInterest
+	if cn.requestState.Interested {
+		ret += time.Since(cn.lastBecameInterested)
+	}
+	return ret
+}
+
+func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
+	_, ok := cn.PeerExtensionIDs[ext]
+	return ok
+}
+
+// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
+func (cn *PeerConn) statusFlags() (ret string) {
+	c := func(b byte) {
+		ret += string([]byte{b})
+	}
+	if cn.requestState.Interested {
+		c('i')
+	}
+	if cn.choking {
+		c('c')
+	}
+	c(':')
+	ret += cn.connectionFlags()
+	c(':')
+	if cn.peerInterested {
+		c('i')
+	}
+	if cn.peerChoking {
+		c('c')
+	}
+	return
+}
+
+func (cn *PeerConn) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
+	var last Option[pieceIndex]
+	var count int
+	next := func(item Option[pieceIndex]) {
+		if item == last {
+			count++
+		} else {
+			if count != 0 {
+				f(last.Value, count)
+			}
+			last = item
+			count = 1
+		}
+	}
+	cn.requestState.Requests.Iterate(func(requestIndex requestStrategy.RequestIndex) bool {
+		next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
+		return true
+	})
+	next(None[pieceIndex]())
+}
+
+func (cn *PeerConn) peerImplWriteStatus(w io.Writer) {
+	prio, err := cn.peerPriority()
+	prioStr := fmt.Sprintf("%08x", prio)
+	if err != nil {
+		prioStr += ": " + err.Error()
+	}
+	fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
+	fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
+		eventAgeString(cn.lastMessageReceived),
+		eventAgeString(cn.completedHandshake),
+		eventAgeString(cn.lastHelpful()),
+		cn.cumInterest(),
+		cn.totalExpectingTime(),
+	)
+	fmt.Fprintf(w,
+		"%s completed, chunks uploaded: %v\n",
+		cn.completedString(),
+		&cn._stats.ChunksWritten,
+	)
+	fmt.Fprintf(w, "requested pieces:")
+	cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
+		fmt.Fprintf(w, " %v(%v)", piece, count)
+	})
+}
+
+func (cn *PeerConn) setInterested(interested bool) bool {
+	if cn.requestState.Interested == interested {
+		return true
+	}
+	cn.requestState.Interested = interested
+	if interested {
+		cn.lastBecameInterested = time.Now()
+	} else if !cn.lastBecameInterested.IsZero() {
+		cn.priorInterest += time.Since(cn.lastBecameInterested)
+	}
+	cn.updateExpectingChunks()
+	return cn.writeInterested(interested)
+}
+
+// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
+// when we want to go fast.
+func (cn *PeerConn) shouldRequest(r RequestIndex) error {
+	err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
+	if err != nil {
+		return err
+	}
+	pi := cn.t.pieceIndexOfRequestIndex(r)
+	if cn.requestState.Cancelled.Contains(r) {
+		return errors.New("request is cancelled and waiting acknowledgement")
+	}
+	if !cn.peerHasPiece(pi) {
+		return errors.New("requesting piece peer doesn't have")
+	}
+	if !cn.t.peerIsActive(cn.peerPtr()) {
+		panic("requesting but not in active conns")
+	}
+	if cn.closed.IsSet() {
+		panic("requesting when connection is closed")
+	}
+	if cn.t.hashingPiece(pi) {
+		panic("piece is being hashed")
+	}
+	p := cn.t.piece(pi)
+	if p.marking {
+		panic("piece is being marked")
+	}
+	if cn.t.pieceQueuedForHash(pi) {
+		panic("piece is queued for hash")
+	}
+	if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
+		// This could occur if we made a request with the fast extension, and then got choked and
+		// haven't had the request rejected yet.
+		if !cn.requestState.Requests.Contains(r) {
+			panic("peer choking and piece not allowed fast")
+		}
+	}
+	return nil
+}
+
+func (cn *PeerConn) mustRequest(r RequestIndex) bool {
+	more, err := cn.request(r)
+	if err != nil {
+		panic(err)
+	}
+	return more
+}
+
+func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
+	if err := cn.shouldRequest(r); err != nil {
+		panic(err)
+	}
+	if cn.requestState.Requests.Contains(r) {
+		return true, nil
+	}
+	if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
+		return true, errors.New("too many outstanding requests")
+	}
+	cn.requestState.Requests.Add(r)
+	if cn.validReceiveChunks == nil {
+		cn.validReceiveChunks = make(map[RequestIndex]int)
+	}
+	cn.validReceiveChunks[r]++
+	cn.t.requestState[r] = requestState{
+		peer: cn,
+		when: time.Now(),
+	}
+	cn.updateExpectingChunks()
+	ppReq := cn.t.requestIndexToRequest(r)
+	for _, f := range cn.callbacks.SentRequest {
+		f(PeerRequestEvent{cn.peerPtr(), ppReq})
+	}
+	return cn._request(ppReq), nil
+}
+
+func (me *PeerConn) cancel(r RequestIndex) {
+	if !me.deleteRequest(r) {
+		panic("request not existing should have been guarded")
+	}
+	me.handleCancel(r)
+	me.decPeakRequests()
+	if me.isLowOnRequests() {
+		me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
+	}
+}
+
+// Sets a reason to update requests, and if there wasn't already one, handle it.
+func (cn *PeerConn) onNeedUpdateRequests(reason updateRequestReason) {
+	if cn.needRequestUpdate != "" {
+		return
+	}
+	cn.needRequestUpdate = reason
+	// Run this before the Client lock is released.
+	cn.locker().DeferUniqueUnaryFunc(cn, cn.handleOnNeedUpdateRequests)
+}
+
+// Returns true if it was valid to reject the request.
+func (c *PeerConn) remoteRejectedRequest(r RequestIndex) bool {
+	if c.deleteRequest(r) {
+		c.decPeakRequests()
+	} else if !c.requestState.Cancelled.CheckedRemove(r) {
+		// The request was already cancelled.
+		return false
+	}
+	if c.isLowOnRequests() {
+		c.onNeedUpdateRequests(peerUpdateRequestsRemoteRejectReason)
+	}
+	c.decExpectedChunkReceive(r)
+	return true
+}
+
+func (c *PeerConn) decExpectedChunkReceive(r RequestIndex) {
+	count := c.validReceiveChunks[r]
+	if count == 1 {
+		delete(c.validReceiveChunks, r)
+	} else if count > 1 {
+		c.validReceiveChunks[r] = count - 1
+	} else {
+		panic(r)
+	}
+}
+
+// Returns true if an outstanding request is removed. Cancelled requests should be handled
+// separately.
+func (c *PeerConn) deleteRequest(r RequestIndex) bool {
+	if !c.requestState.Requests.CheckedRemove(r) {
+		return false
+	}
+	for _, f := range c.callbacks.DeletedRequest {
+		f(PeerRequestEvent{c.peerPtr(), c.t.requestIndexToRequest(r)})
+	}
+	c.updateExpectingChunks()
+	// TODO: Can't this happen if a request is stolen?
+	if c.t.requestingPeer(r) != c {
+		panic("only one peer should have a given request at a time")
+	}
+	delete(c.t.requestState, r)
+	// c.t.iterPeers(func(p *Peer) {
+	// 	if p.isLowOnRequests() {
+	// 		p.onNeedUpdateRequests("Peer.deleteRequest")
+	// 	}
+	// })
+	return true
+}
+
+func (c *PeerConn) deleteAllRequests(reason updateRequestReason) {
+	if c.requestState.Requests.IsEmpty() {
+		return
+	}
+	c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
+		if !c.deleteRequest(x) {
+			panic("request should exist")
+		}
+		return true
+	})
+	c.assertNoRequests()
+	c.t.iterPeers(func(p *Peer) {
+		if p.isLowOnRequests() {
+			p.onNeedUpdateRequests(reason)
+		}
+	})
+}
+
+func (c *PeerConn) assertNoRequests() {
+	if !c.requestState.Requests.IsEmpty() {
+		panic(c.requestState.Requests.GetCardinality())
+	}
+}
+
+func (c *PeerConn) cancelAllRequests() {
+	c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
+		c.cancel(x)
+		return true
+	})
+	c.assertNoRequests()
+}
+
+func (p *PeerConn) uncancelledRequests() uint64 {
+	return p.requestState.Requests.GetCardinality()
+}
+
+func (p *PeerConn) isLowOnRequests() bool {
+	return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
+}
+
 func (c *PeerConn) checkReceivedChunk(req RequestIndex, msg *pp.Message, ppReq Request) (intended bool, err error) {
 	if c.validReceiveChunks[req] <= 0 {
 		ChunksReceived.Add("unexpected", 1)
-- 
2.51.0