]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move requestState into PeerConn so cancelAllRequests doesn't crash
authorMatt Joiner <anacrolix@gmail.com>
Tue, 15 Jul 2025 06:36:01 +0000 (16:36 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 15 Jul 2025 06:36:01 +0000 (16:36 +1000)
peer-impl.go
peer.go
peerconn.go
requesting.go
torrent.go
webseed-peer.go

index 4bae3f8cd175441f3af095513d58748f95c2dbba..aa0fe9feae101d5a1e9b459804d9342598eabfcc 100644 (file)
@@ -24,6 +24,7 @@ type legacyPeerImpl interface {
 
        // handleCancel initiates cancellation of a request
        handleCancel(ri RequestIndex)
+       cancelAllRequests()
        connectionFlags() string
        onClose()
        onGotInfo(info *metainfo.Info)
diff --git a/peer.go b/peer.go
index a9e13d1efb7af970e72297b1c977c9ddc9d5adde..7a3b10e190bd99d83e6268384b37e0447cb4a4a2 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -71,9 +71,7 @@ type (
                lastChunkSent       time.Time
 
                // Stuff controlled by the local peer.
-               needRequestUpdate updateRequestReason
-               // TODO: How are pending cancels handled for webseed peers?
-               requestState         requestStrategy.PeerRequestState
+               needRequestUpdate    updateRequestReason
                updateRequestsTimer  *time.Timer
                lastRequestUpdate    time.Time
                peakRequests         maxRequests
@@ -148,7 +146,7 @@ func (p *Peer) Stats() (ret PeerStats) {
        return
 }
 
-func (p *Peer) initRequestState() {
+func (p *PeerConn) initRequestState() {
        p.requestState.Requests = &peerRequests{}
 }
 
@@ -187,7 +185,7 @@ func (cn *PeerConn) expectingChunks() bool {
        return haveAllowedFastRequests
 }
 
-func (cn *Peer) cumInterest() time.Duration {
+func (cn *PeerConn) cumInterest() time.Duration {
        ret := cn.priorInterest
        if cn.requestState.Interested {
                ret += time.Since(cn.lastBecameInterested)
@@ -233,7 +231,7 @@ func eventAgeString(t time.Time) string {
 }
 
 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
-func (cn *Peer) statusFlags() (ret string) {
+func (cn *PeerConn) statusFlags() (ret string) {
        c := func(b byte) {
                ret += string([]byte{b})
        }
@@ -410,7 +408,7 @@ type messageWriter func(pp.Message) bool
 
 // This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
 // when we want to go fast.
-func (cn *Peer) shouldRequest(r RequestIndex) error {
+func (cn *PeerConn) shouldRequest(r RequestIndex) error {
        err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
        if err != nil {
                return err
@@ -422,7 +420,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error {
        if !cn.peerHasPiece(pi) {
                return errors.New("requesting piece peer doesn't have")
        }
-       if !cn.t.peerIsActive(cn) {
+       if !cn.t.peerIsActive(cn.peerPtr()) {
                panic("requesting but not in active conns")
        }
        if cn.closed.IsSet() {
@@ -472,7 +470,7 @@ func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
        }
        cn.validReceiveChunks[r]++
        cn.t.requestState[r] = requestState{
-               peer: cn.peerPtr(),
+               peer: cn,
                when: time.Now(),
        }
        cn.updateExpectingChunks()
@@ -483,7 +481,7 @@ func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
        return cn._request(ppReq), nil
 }
 
-func (me *Peer) cancel(r RequestIndex) {
+func (me *PeerConn) cancel(r RequestIndex) {
        if !me.deleteRequest(r) {
                panic("request not existing should have been guarded")
        }
@@ -669,7 +667,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
 
        // Cancel pending requests for this chunk from *other* peers.
        if p := t.requestingPeer(req); p != nil {
-               if p == c {
+               if p.peerPtr() == c {
                        p.logger.Slogger().Error("received chunk but still pending request", "peer", p, "req", req)
                        panic("should not be pending request from conn that just received it")
                }
@@ -750,12 +748,12 @@ func (c *Peer) peerHasWantedPieces() bool {
 
 // Returns true if an outstanding request is removed. Cancelled requests should be handled
 // separately.
-func (c *Peer) deleteRequest(r RequestIndex) bool {
+func (c *PeerConn) deleteRequest(r RequestIndex) bool {
        if !c.requestState.Requests.CheckedRemove(r) {
                return false
        }
        for _, f := range c.callbacks.DeletedRequest {
-               f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
+               f(PeerRequestEvent{c.peerPtr(), c.t.requestIndexToRequest(r)})
        }
        c.updateExpectingChunks()
        // TODO: Can't this happen if a request is stolen?
@@ -771,7 +769,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
        return true
 }
 
-func (c *Peer) deleteAllRequests(reason updateRequestReason) {
+func (c *PeerConn) deleteAllRequests(reason updateRequestReason) {
        if c.requestState.Requests.IsEmpty() {
                return
        }
@@ -789,13 +787,13 @@ func (c *Peer) deleteAllRequests(reason updateRequestReason) {
        })
 }
 
-func (c *Peer) assertNoRequests() {
+func (c *PeerConn) assertNoRequests() {
        if !c.requestState.Requests.IsEmpty() {
                panic(c.requestState.Requests.GetCardinality())
        }
 }
 
-func (c *Peer) cancelAllRequests() {
+func (c *PeerConn) cancelAllRequests() {
        c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
                c.cancel(x)
                return true
@@ -853,7 +851,7 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
        return pc, ok
 }
 
-func (p *Peer) uncancelledRequests() uint64 {
+func (p *PeerConn) uncancelledRequests() uint64 {
        return p.requestState.Requests.GetCardinality()
 }
 
index 5ccf23cec4cdab7179f3b2e2932184ff3c859d73..a3e0a9d079e276518005c4dfb1813f65d3436cb5 100644 (file)
@@ -22,6 +22,7 @@ import (
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
+       requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
        "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/bencode"
@@ -95,6 +96,9 @@ type PeerConn struct {
        // we may not even know the number of pieces in the torrent yet.
        peerSentHaveAll bool
 
+       // TODO: How are pending cancels handled for webseed peers?
+       requestState requestStrategy.PeerRequestState
+
        peerRequestDataAllocLimiter alloclim.Limiter
 
        outstandingHolepunchingRendezvous map[netip.AddrPort]struct{}
index 5fc4a4fddda9fe3e7033cd6235a07bf0ba55c9da..c45a502b06d297958c919dfa782752d0033c2688 100644 (file)
@@ -83,7 +83,7 @@ type (
 
 type desiredPeerRequests struct {
        requestIndexes []RequestIndex
-       peer           *Peer
+       peer           *PeerConn
        pieceStates    []g.Option[requestStrategy.PieceRequestOrderState]
 }
 
@@ -185,7 +185,7 @@ func (t *Torrent) getRequestablePieces(f requestStrategy.RequestPieceFunc) {
 // This gets the best-case request state. That means handling pieces limited by capacity, preferring
 // earlier pieces, low availability etc. It pays no attention to existing requests on the peer or
 // other peers. Those are handled later.
-func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
+func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) {
        t := p.t
        if !t.haveInfo() {
                return
@@ -360,7 +360,7 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) {
                }
 
                existing := t.requestingPeer(req)
-               if existing != nil && existing != p.peerPtr() {
+               if existing != nil && existing != p {
                        // don't steal on cancel - because this is triggered by t.cancelRequest below
                        // which means that the cancelled can immediately try to steal back a request
                        // it has lost which can lead to circular cancel/add processing
index 6b2ba6b0de650989e646bfbe474d9c5ced383987..aab918a8c3696fb82937325335b5d4e04bde2988 100644 (file)
@@ -1471,18 +1471,21 @@ func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequ
                // Non-pending pieces are usually cancelled more synchronously.
                return
        }
-       t.iterPeers(func(c *Peer) {
-               if !c.isLowOnRequests() {
-                       return
-               }
-               if !c.peerHasPiece(piece) {
-                       return
-               }
-               if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
-                       return
-               }
-               c.onNeedUpdateRequests(reason)
-       })
+       for c := range t.conns {
+               // This is a lot of effort to avoid using continue...
+               func() {
+                       if !c.isLowOnRequests() {
+                               return
+                       }
+                       if !c.peerHasPiece(piece) {
+                               return
+                       }
+                       if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
+                               return
+                       }
+                       c.onNeedUpdateRequests(reason)
+               }()
+       }
 }
 
 // Stuff we don't want to run when the pending pieces change while benchmarking.
@@ -3133,7 +3136,7 @@ func (t *Torrent) updateComplete() {
        t.complete.SetBool(t.haveAllPieces())
 }
 
-func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
+func (t *Torrent) cancelRequest(r RequestIndex) *PeerConn {
        p := t.requestingPeer(r)
        if p != nil {
                p.cancel(r)
@@ -3146,7 +3149,7 @@ func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
        return p
 }
 
-func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
+func (t *Torrent) requestingPeer(r RequestIndex) *PeerConn {
        return t.requestState[r].peer
 }
 
@@ -3208,7 +3211,7 @@ func (t *Torrent) GetWebRtcPeerConnStats() map[string]webRtcStatsReports {
 }
 
 type requestState struct {
-       peer *Peer
+       peer *PeerConn
        when time.Time
 }
 
index 56e50e74e3ee0facbbf884d998228659e9aeb965..1a131f1271e0c31521119bb3d8e128d7bf65fecc 100644 (file)
@@ -32,6 +32,14 @@ type webseedPeer struct {
        hostKey          webseedHostKeyHandle
 }
 
+func (me *webseedPeer) cancelAllRequests() {
+       // Is there any point to this? Won't we fail to receive a chunk and cancel anyway? Should we
+       // Close requests instead?
+       for req := range me.activeRequests {
+               req.Cancel()
+       }
+}
+
 func (me *webseedPeer) peerImplWriteStatus(w io.Writer) {}
 
 func (me *webseedPeer) isLowOnRequests() bool {
@@ -211,33 +219,6 @@ func (ws *webseedPeer) deleteActiveRequest(wr *webseedRequest) {
        ws.peer.updateExpectingChunks()
 }
 
-func (ws *webseedPeer) iterConsecutiveRequests(begin RequestIndex) iter.Seq[RequestIndex] {
-       return func(yield func(RequestIndex) bool) {
-               for {
-                       if !ws.peer.requestState.Requests.Contains(begin) {
-                               return
-                       }
-                       if !yield(begin) {
-                               return
-                       }
-                       begin++
-               }
-       }
-}
-
-func (ws *webseedPeer) iterConsecutiveInactiveRequests(begin RequestIndex) iter.Seq[RequestIndex] {
-       return func(yield func(RequestIndex) bool) {
-               for req := range ws.iterConsecutiveRequests(begin) {
-                       if !ws.inactiveRequestIndex(req) {
-                               return
-                       }
-                       if !yield(req) {
-                               return
-                       }
-               }
-       }
-}
-
 func (ws *webseedPeer) inactiveRequestIndex(index RequestIndex) bool {
        for range ws.activeRequestsForIndex(index) {
                return false
@@ -245,24 +226,6 @@ func (ws *webseedPeer) inactiveRequestIndex(index RequestIndex) bool {
        return true
 }
 
-func (ws *webseedPeer) inactiveRequests() iter.Seq[RequestIndex] {
-       return func(yield func(RequestIndex) bool) {
-               // This is used to determine contiguity of requests.
-               //sorted := slices.Sorted(ws.peer.requestState.Requests.Iterator())
-               //if len(sorted) != 0 {
-               //      fmt.Println("inactiveRequests", sorted)
-               //}
-               for reqIndex := range ws.peer.requestState.Requests.Iterator() {
-                       if !ws.inactiveRequestIndex(reqIndex) {
-                               continue
-                       }
-                       if !yield(reqIndex) {
-                               return
-                       }
-               }
-       }
-}
-
 func (ws *webseedPeer) connectionFlags() string {
        return "WS"
 }