]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Claude moved PeerConn methods from peer.go to peerconn.go
authorMatt Joiner <anacrolix@gmail.com>
Wed, 16 Jul 2025 04:59:18 +0000 (14:59 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 16 Jul 2025 04:59:42 +0000 (14:59 +1000)
peer.go
peerconn.go

diff --git a/peer.go b/peer.go
index 7a3b10e190bd99d83e6268384b37e0447cb4a4a2..6bcddd495b06e8914268e324009a9c465c5905c8 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -1,7 +1,6 @@
 package torrent
 
 import (
-       "errors"
        "fmt"
        "io"
        "log/slog"
@@ -19,7 +18,6 @@ import (
        "github.com/anacrolix/multiless"
 
        "github.com/anacrolix/torrent/internal/alloclim"
-       requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
        typedRoaring "github.com/anacrolix/torrent/typed-roaring"
@@ -146,10 +144,6 @@ func (p *Peer) Stats() (ret PeerStats) {
        return
 }
 
-func (p *PeerConn) initRequestState() {
-       p.requestState.Requests = &peerRequests{}
-}
-
 func (cn *Peer) updateExpectingChunks() {
        if cn.peerImpl.expectingChunks() {
                if cn.lastStartedExpectingToReceiveChunks.IsZero() {
@@ -163,45 +157,10 @@ func (cn *Peer) updateExpectingChunks() {
        }
 }
 
-func (cn *PeerConn) expectingChunks() bool {
-       if cn.requestState.Requests.IsEmpty() {
-               return false
-       }
-       if !cn.requestState.Interested {
-               return false
-       }
-       if !cn.peerChoking {
-               return true
-       }
-       haveAllowedFastRequests := false
-       cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
-               haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
-                       cn.requestState.Requests,
-                       cn.t.pieceRequestIndexBegin(i),
-                       cn.t.pieceRequestIndexBegin(i+1),
-               ) == 0
-               return !haveAllowedFastRequests
-       })
-       return haveAllowedFastRequests
-}
-
-func (cn *PeerConn) cumInterest() time.Duration {
-       ret := cn.priorInterest
-       if cn.requestState.Interested {
-               ret += time.Since(cn.lastBecameInterested)
-       }
-       return ret
-}
-
 func (cn *Peer) locker() *lockWithDeferreds {
        return cn.t.cl.locker()
 }
 
-func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
-       _, ok := cn.PeerExtensionIDs[ext]
-       return ok
-}
-
 // The best guess at number of pieces in the torrent for this peer.
 func (cn *Peer) bestPeerNumPieces() pieceIndex {
        if cn.t.haveInfo() {
@@ -230,29 +189,6 @@ func eventAgeString(t time.Time) string {
        return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
 }
 
-// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
-func (cn *PeerConn) statusFlags() (ret string) {
-       c := func(b byte) {
-               ret += string([]byte{b})
-       }
-       if cn.requestState.Interested {
-               c('i')
-       }
-       if cn.choking {
-               c('c')
-       }
-       c(':')
-       ret += cn.connectionFlags()
-       c(':')
-       if cn.peerInterested {
-               c('i')
-       }
-       if cn.peerChoking {
-               c('c')
-       }
-       return
-}
-
 func (cn *Peer) downloadRate() float64 {
        num := cn._stats.BytesReadUsefulData.Int64()
        if num == 0 {
@@ -266,52 +202,6 @@ func (p *Peer) DownloadRate() float64 {
        return p.Stats().DownloadRate
 }
 
-func (cn *PeerConn) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
-       var last Option[pieceIndex]
-       var count int
-       next := func(item Option[pieceIndex]) {
-               if item == last {
-                       count++
-               } else {
-                       if count != 0 {
-                               f(last.Value, count)
-                       }
-                       last = item
-                       count = 1
-               }
-       }
-       cn.requestState.Requests.Iterate(func(requestIndex requestStrategy.RequestIndex) bool {
-               next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
-               return true
-       })
-       next(None[pieceIndex]())
-}
-
-func (cn *PeerConn) peerImplWriteStatus(w io.Writer) {
-       prio, err := cn.peerPriority()
-       prioStr := fmt.Sprintf("%08x", prio)
-       if err != nil {
-               prioStr += ": " + err.Error()
-       }
-       fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
-       fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
-               eventAgeString(cn.lastMessageReceived),
-               eventAgeString(cn.completedHandshake),
-               eventAgeString(cn.lastHelpful()),
-               cn.cumInterest(),
-               cn.totalExpectingTime(),
-       )
-       fmt.Fprintf(w,
-               "%s completed, chunks uploaded: %v\n",
-               cn.completedString(),
-               &cn._stats.ChunksWritten,
-       )
-       fmt.Fprintf(w, "requested pieces:")
-       cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
-               fmt.Fprintf(w, " %v(%v)", piece, count)
-       })
-}
-
 func (cn *Peer) writeStatus(w io.Writer) {
        // \t isn't preserved in <pre> blocks?
        if cn.closed.IsSet() {
@@ -387,121 +277,10 @@ func (cn *Peer) totalExpectingTime() (ret time.Duration) {
        return
 }
 
-func (cn *PeerConn) setInterested(interested bool) bool {
-       if cn.requestState.Interested == interested {
-               return true
-       }
-       cn.requestState.Interested = interested
-       if interested {
-               cn.lastBecameInterested = time.Now()
-       } else if !cn.lastBecameInterested.IsZero() {
-               cn.priorInterest += time.Since(cn.lastBecameInterested)
-       }
-       cn.updateExpectingChunks()
-       // log.Printf("%p: setting interest: %v", cn, interested)
-       return cn.writeInterested(interested)
-}
-
 // The function takes a message to be sent, and returns true if more messages
 // are okay.
 type messageWriter func(pp.Message) bool
 
-// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
-// when we want to go fast.
-func (cn *PeerConn) shouldRequest(r RequestIndex) error {
-       err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
-       if err != nil {
-               return err
-       }
-       pi := cn.t.pieceIndexOfRequestIndex(r)
-       if cn.requestState.Cancelled.Contains(r) {
-               return errors.New("request is cancelled and waiting acknowledgement")
-       }
-       if !cn.peerHasPiece(pi) {
-               return errors.New("requesting piece peer doesn't have")
-       }
-       if !cn.t.peerIsActive(cn.peerPtr()) {
-               panic("requesting but not in active conns")
-       }
-       if cn.closed.IsSet() {
-               panic("requesting when connection is closed")
-       }
-       if cn.t.hashingPiece(pi) {
-               panic("piece is being hashed")
-       }
-       p := cn.t.piece(pi)
-       if p.marking {
-               panic("piece is being marked")
-       }
-       if cn.t.pieceQueuedForHash(pi) {
-               panic("piece is queued for hash")
-       }
-       if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
-               // This could occur if we made a request with the fast extension, and then got choked and
-               // haven't had the request rejected yet.
-               if !cn.requestState.Requests.Contains(r) {
-                       panic("peer choking and piece not allowed fast")
-               }
-       }
-       return nil
-}
-
-func (cn *PeerConn) mustRequest(r RequestIndex) bool {
-       more, err := cn.request(r)
-       if err != nil {
-               panic(err)
-       }
-       return more
-}
-
-func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
-       if err := cn.shouldRequest(r); err != nil {
-               panic(err)
-       }
-       if cn.requestState.Requests.Contains(r) {
-               return true, nil
-       }
-       if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
-               return true, errors.New("too many outstanding requests")
-       }
-       cn.requestState.Requests.Add(r)
-       if cn.validReceiveChunks == nil {
-               cn.validReceiveChunks = make(map[RequestIndex]int)
-       }
-       cn.validReceiveChunks[r]++
-       cn.t.requestState[r] = requestState{
-               peer: cn,
-               when: time.Now(),
-       }
-       cn.updateExpectingChunks()
-       ppReq := cn.t.requestIndexToRequest(r)
-       for _, f := range cn.callbacks.SentRequest {
-               f(PeerRequestEvent{cn.peerPtr(), ppReq})
-       }
-       return cn._request(ppReq), nil
-}
-
-func (me *PeerConn) cancel(r RequestIndex) {
-       if !me.deleteRequest(r) {
-               panic("request not existing should have been guarded")
-       }
-       me.handleCancel(r)
-       me.decPeakRequests()
-       if me.isLowOnRequests() {
-               me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
-       }
-}
-
-// Sets a reason to update requests, and if there wasn't already one, handle it.
-func (cn *PeerConn) onNeedUpdateRequests(reason updateRequestReason) {
-       if cn.needRequestUpdate != "" {
-               return
-       }
-       cn.needRequestUpdate = reason
-       // Run this before the Client lock is released.
-       cn.locker().DeferUniqueUnaryFunc(cn, cn.handleOnNeedUpdateRequests)
-}
-
 // Emits the indices in the Bitmaps bms in order, never repeating any index.
 // skip is mutated during execution, and its initial values will never be
 // emitted.
@@ -575,32 +354,6 @@ func runSafeExtraneous(f func()) {
        }
 }
 
-// Returns true if it was valid to reject the request.
-func (c *PeerConn) remoteRejectedRequest(r RequestIndex) bool {
-       if c.deleteRequest(r) {
-               c.decPeakRequests()
-       } else if !c.requestState.Cancelled.CheckedRemove(r) {
-               // The request was already cancelled.
-               return false
-       }
-       if c.isLowOnRequests() {
-               c.onNeedUpdateRequests(peerUpdateRequestsRemoteRejectReason)
-       }
-       c.decExpectedChunkReceive(r)
-       return true
-}
-
-func (c *PeerConn) decExpectedChunkReceive(r RequestIndex) {
-       count := c.validReceiveChunks[r]
-       if count == 1 {
-               delete(c.validReceiveChunks, r)
-       } else if count > 1 {
-               c.validReceiveChunks[r] = count - 1
-       } else {
-               panic(r)
-       }
-}
-
 func (c *Peer) doChunkReadStats(size int64) {
        c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) })
 }
@@ -746,61 +499,6 @@ func (c *Peer) peerHasWantedPieces() bool {
        return c.peerPieces().Intersects(&c.t._pendingPieces)
 }
 
-// Returns true if an outstanding request is removed. Cancelled requests should be handled
-// separately.
-func (c *PeerConn) deleteRequest(r RequestIndex) bool {
-       if !c.requestState.Requests.CheckedRemove(r) {
-               return false
-       }
-       for _, f := range c.callbacks.DeletedRequest {
-               f(PeerRequestEvent{c.peerPtr(), c.t.requestIndexToRequest(r)})
-       }
-       c.updateExpectingChunks()
-       // TODO: Can't this happen if a request is stolen?
-       if c.t.requestingPeer(r) != c {
-               panic("only one peer should have a given request at a time")
-       }
-       delete(c.t.requestState, r)
-       // c.t.iterPeers(func(p *Peer) {
-       //      if p.isLowOnRequests() {
-       //              p.onNeedUpdateRequests("Peer.deleteRequest")
-       //      }
-       // })
-       return true
-}
-
-func (c *PeerConn) deleteAllRequests(reason updateRequestReason) {
-       if c.requestState.Requests.IsEmpty() {
-               return
-       }
-       c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
-               if !c.deleteRequest(x) {
-                       panic("request should exist")
-               }
-               return true
-       })
-       c.assertNoRequests()
-       c.t.iterPeers(func(p *Peer) {
-               if p.isLowOnRequests() {
-                       p.onNeedUpdateRequests(reason)
-               }
-       })
-}
-
-func (c *PeerConn) assertNoRequests() {
-       if !c.requestState.Requests.IsEmpty() {
-               panic(c.requestState.Requests.GetCardinality())
-       }
-}
-
-func (c *PeerConn) cancelAllRequests() {
-       c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
-               c.cancel(x)
-               return true
-       })
-       c.assertNoRequests()
-}
-
 func (c *Peer) peerPriority() (peerPriority, error) {
        return bep40Priority(c.remoteIpPort(), c.localPublicAddr)
 }
@@ -851,16 +549,8 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
        return pc, ok
 }
 
-func (p *PeerConn) uncancelledRequests() uint64 {
-       return p.requestState.Requests.GetCardinality()
-}
-
 type peerLocalPublicAddr = IpPort
 
-func (p *PeerConn) isLowOnRequests() bool {
-       return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
-}
-
 func (p *Peer) decPeakRequests() {
        // // This can occur when peak requests are altered by the update request timer to be lower than
        // // the actual number of outstanding requests. Let's let it go negative and see what happens. I
index a3e0a9d079e276518005c4dfb1813f65d3436cb5..a8d466ea970e04eca717f41ae69fb1853b6fefdb 100644 (file)
@@ -22,11 +22,13 @@ import (
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
-       requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
+
        "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/internal/alloclim"
+       requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
+
        "github.com/anacrolix/torrent/merkle"
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
@@ -1496,6 +1498,315 @@ func (me *PeerConn) setPeerLoggers(a log.Logger, s *slog.Logger) {
        me.protocolLogger = me.logger.WithNames(protocolLoggingName)
 }
 
+// Methods moved from peer.go (in their original order):
+
+func (p *PeerConn) initRequestState() {
+       p.requestState.Requests = &peerRequests{}
+}
+
+func (cn *PeerConn) expectingChunks() bool {
+       if cn.requestState.Requests.IsEmpty() {
+               return false
+       }
+       if !cn.requestState.Interested {
+               return false
+       }
+       if !cn.peerChoking {
+               return true
+       }
+       haveAllowedFastRequests := false
+       cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
+               haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
+                       cn.requestState.Requests,
+                       cn.t.pieceRequestIndexBegin(i),
+                       cn.t.pieceRequestIndexBegin(i+1),
+               ) == 0
+               return !haveAllowedFastRequests
+       })
+       return haveAllowedFastRequests
+}
+
+func (cn *PeerConn) cumInterest() time.Duration {
+       ret := cn.priorInterest
+       if cn.requestState.Interested {
+               ret += time.Since(cn.lastBecameInterested)
+       }
+       return ret
+}
+
+func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
+       _, ok := cn.PeerExtensionIDs[ext]
+       return ok
+}
+
+// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
+func (cn *PeerConn) statusFlags() (ret string) {
+       c := func(b byte) {
+               ret += string([]byte{b})
+       }
+       if cn.requestState.Interested {
+               c('i')
+       }
+       if cn.choking {
+               c('c')
+       }
+       c(':')
+       ret += cn.connectionFlags()
+       c(':')
+       if cn.peerInterested {
+               c('i')
+       }
+       if cn.peerChoking {
+               c('c')
+       }
+       return
+}
+
+func (cn *PeerConn) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
+       var last Option[pieceIndex]
+       var count int
+       next := func(item Option[pieceIndex]) {
+               if item == last {
+                       count++
+               } else {
+                       if count != 0 {
+                               f(last.Value, count)
+                       }
+                       last = item
+                       count = 1
+               }
+       }
+       cn.requestState.Requests.Iterate(func(requestIndex requestStrategy.RequestIndex) bool {
+               next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
+               return true
+       })
+       next(None[pieceIndex]())
+}
+
+func (cn *PeerConn) peerImplWriteStatus(w io.Writer) {
+       prio, err := cn.peerPriority()
+       prioStr := fmt.Sprintf("%08x", prio)
+       if err != nil {
+               prioStr += ": " + err.Error()
+       }
+       fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
+       fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
+               eventAgeString(cn.lastMessageReceived),
+               eventAgeString(cn.completedHandshake),
+               eventAgeString(cn.lastHelpful()),
+               cn.cumInterest(),
+               cn.totalExpectingTime(),
+       )
+       fmt.Fprintf(w,
+               "%s completed, chunks uploaded: %v\n",
+               cn.completedString(),
+               &cn._stats.ChunksWritten,
+       )
+       fmt.Fprintf(w, "requested pieces:")
+       cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
+               fmt.Fprintf(w, " %v(%v)", piece, count)
+       })
+}
+
+func (cn *PeerConn) setInterested(interested bool) bool {
+       if cn.requestState.Interested == interested {
+               return true
+       }
+       cn.requestState.Interested = interested
+       if interested {
+               cn.lastBecameInterested = time.Now()
+       } else if !cn.lastBecameInterested.IsZero() {
+               cn.priorInterest += time.Since(cn.lastBecameInterested)
+       }
+       cn.updateExpectingChunks()
+       return cn.writeInterested(interested)
+}
+
+// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
+// when we want to go fast.
+func (cn *PeerConn) shouldRequest(r RequestIndex) error {
+       err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
+       if err != nil {
+               return err
+       }
+       pi := cn.t.pieceIndexOfRequestIndex(r)
+       if cn.requestState.Cancelled.Contains(r) {
+               return errors.New("request is cancelled and waiting acknowledgement")
+       }
+       if !cn.peerHasPiece(pi) {
+               return errors.New("requesting piece peer doesn't have")
+       }
+       if !cn.t.peerIsActive(cn.peerPtr()) {
+               panic("requesting but not in active conns")
+       }
+       if cn.closed.IsSet() {
+               panic("requesting when connection is closed")
+       }
+       if cn.t.hashingPiece(pi) {
+               panic("piece is being hashed")
+       }
+       p := cn.t.piece(pi)
+       if p.marking {
+               panic("piece is being marked")
+       }
+       if cn.t.pieceQueuedForHash(pi) {
+               panic("piece is queued for hash")
+       }
+       if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
+               // This could occur if we made a request with the fast extension, and then got choked and
+               // haven't had the request rejected yet.
+               if !cn.requestState.Requests.Contains(r) {
+                       panic("peer choking and piece not allowed fast")
+               }
+       }
+       return nil
+}
+
+func (cn *PeerConn) mustRequest(r RequestIndex) bool {
+       more, err := cn.request(r)
+       if err != nil {
+               panic(err)
+       }
+       return more
+}
+
+func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
+       if err := cn.shouldRequest(r); err != nil {
+               panic(err)
+       }
+       if cn.requestState.Requests.Contains(r) {
+               return true, nil
+       }
+       if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
+               return true, errors.New("too many outstanding requests")
+       }
+       cn.requestState.Requests.Add(r)
+       if cn.validReceiveChunks == nil {
+               cn.validReceiveChunks = make(map[RequestIndex]int)
+       }
+       cn.validReceiveChunks[r]++
+       cn.t.requestState[r] = requestState{
+               peer: cn,
+               when: time.Now(),
+       }
+       cn.updateExpectingChunks()
+       ppReq := cn.t.requestIndexToRequest(r)
+       for _, f := range cn.callbacks.SentRequest {
+               f(PeerRequestEvent{cn.peerPtr(), ppReq})
+       }
+       return cn._request(ppReq), nil
+}
+
+func (me *PeerConn) cancel(r RequestIndex) {
+       if !me.deleteRequest(r) {
+               panic("request not existing should have been guarded")
+       }
+       me.handleCancel(r)
+       me.decPeakRequests()
+       if me.isLowOnRequests() {
+               me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
+       }
+}
+
+// Sets a reason to update requests, and if there wasn't already one, handle it.
+func (cn *PeerConn) onNeedUpdateRequests(reason updateRequestReason) {
+       if cn.needRequestUpdate != "" {
+               return
+       }
+       cn.needRequestUpdate = reason
+       // Run this before the Client lock is released.
+       cn.locker().DeferUniqueUnaryFunc(cn, cn.handleOnNeedUpdateRequests)
+}
+
+// Returns true if it was valid to reject the request.
+func (c *PeerConn) remoteRejectedRequest(r RequestIndex) bool {
+       if c.deleteRequest(r) {
+               c.decPeakRequests()
+       } else if !c.requestState.Cancelled.CheckedRemove(r) {
+               // The request was already cancelled.
+               return false
+       }
+       if c.isLowOnRequests() {
+               c.onNeedUpdateRequests(peerUpdateRequestsRemoteRejectReason)
+       }
+       c.decExpectedChunkReceive(r)
+       return true
+}
+
+func (c *PeerConn) decExpectedChunkReceive(r RequestIndex) {
+       count := c.validReceiveChunks[r]
+       if count == 1 {
+               delete(c.validReceiveChunks, r)
+       } else if count > 1 {
+               c.validReceiveChunks[r] = count - 1
+       } else {
+               panic(r)
+       }
+}
+
+// Returns true if an outstanding request is removed. Cancelled requests should be handled
+// separately.
+func (c *PeerConn) deleteRequest(r RequestIndex) bool {
+       if !c.requestState.Requests.CheckedRemove(r) {
+               return false
+       }
+       for _, f := range c.callbacks.DeletedRequest {
+               f(PeerRequestEvent{c.peerPtr(), c.t.requestIndexToRequest(r)})
+       }
+       c.updateExpectingChunks()
+       // TODO: Can't this happen if a request is stolen?
+       if c.t.requestingPeer(r) != c {
+               panic("only one peer should have a given request at a time")
+       }
+       delete(c.t.requestState, r)
+       // c.t.iterPeers(func(p *Peer) {
+       //      if p.isLowOnRequests() {
+       //              p.onNeedUpdateRequests("Peer.deleteRequest")
+       //      }
+       // })
+       return true
+}
+
+func (c *PeerConn) deleteAllRequests(reason updateRequestReason) {
+       if c.requestState.Requests.IsEmpty() {
+               return
+       }
+       c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
+               if !c.deleteRequest(x) {
+                       panic("request should exist")
+               }
+               return true
+       })
+       c.assertNoRequests()
+       c.t.iterPeers(func(p *Peer) {
+               if p.isLowOnRequests() {
+                       p.onNeedUpdateRequests(reason)
+               }
+       })
+}
+
+func (c *PeerConn) assertNoRequests() {
+       if !c.requestState.Requests.IsEmpty() {
+               panic(c.requestState.Requests.GetCardinality())
+       }
+}
+
+func (c *PeerConn) cancelAllRequests() {
+       c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
+               c.cancel(x)
+               return true
+       })
+       c.assertNoRequests()
+}
+
+func (p *PeerConn) uncancelledRequests() uint64 {
+       return p.requestState.Requests.GetCardinality()
+}
+
+func (p *PeerConn) isLowOnRequests() bool {
+       return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
+}
+
 func (c *PeerConn) checkReceivedChunk(req RequestIndex, msg *pp.Message, ppReq Request) (intended bool, err error) {
        if c.validReceiveChunks[req] <= 0 {
                ChunksReceived.Add("unexpected", 1)