From 860a1785ef30ded4f6846165aecaae7ae8b5f806 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 15 Jul 2025 16:36:01 +1000 Subject: [PATCH] Move requestState into PeerConn so cancelAllRequests doesn't crash --- peer-impl.go | 1 + peer.go | 32 ++++++++++++++--------------- peerconn.go | 4 ++++ requesting.go | 6 +++--- torrent.go | 33 ++++++++++++++++-------------- webseed-peer.go | 53 ++++++++----------------------------------------- 6 files changed, 49 insertions(+), 80 deletions(-) diff --git a/peer-impl.go b/peer-impl.go index 4bae3f8c..aa0fe9fe 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -24,6 +24,7 @@ type legacyPeerImpl interface { // handleCancel initiates cancellation of a request handleCancel(ri RequestIndex) + cancelAllRequests() connectionFlags() string onClose() onGotInfo(info *metainfo.Info) diff --git a/peer.go b/peer.go index a9e13d1e..7a3b10e1 100644 --- a/peer.go +++ b/peer.go @@ -71,9 +71,7 @@ type ( lastChunkSent time.Time // Stuff controlled by the local peer. - needRequestUpdate updateRequestReason - // TODO: How are pending cancels handled for webseed peers? - requestState requestStrategy.PeerRequestState + needRequestUpdate updateRequestReason updateRequestsTimer *time.Timer lastRequestUpdate time.Time peakRequests maxRequests @@ -148,7 +146,7 @@ func (p *Peer) Stats() (ret PeerStats) { return } -func (p *Peer) initRequestState() { +func (p *PeerConn) initRequestState() { p.requestState.Requests = &peerRequests{} } @@ -187,7 +185,7 @@ func (cn *PeerConn) expectingChunks() bool { return haveAllowedFastRequests } -func (cn *Peer) cumInterest() time.Duration { +func (cn *PeerConn) cumInterest() time.Duration { ret := cn.priorInterest if cn.requestState.Interested { ret += time.Since(cn.lastBecameInterested) @@ -233,7 +231,7 @@ func eventAgeString(t time.Time) string { } // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text. -func (cn *Peer) statusFlags() (ret string) { +func (cn *PeerConn) statusFlags() (ret string) { c := func(b byte) { ret += string([]byte{b}) } @@ -410,7 +408,7 @@ type messageWriter func(pp.Message) bool // This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it // when we want to go fast. -func (cn *Peer) shouldRequest(r RequestIndex) error { +func (cn *PeerConn) shouldRequest(r RequestIndex) error { err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r)) if err != nil { return err @@ -422,7 +420,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error { if !cn.peerHasPiece(pi) { return errors.New("requesting piece peer doesn't have") } - if !cn.t.peerIsActive(cn) { + if !cn.t.peerIsActive(cn.peerPtr()) { panic("requesting but not in active conns") } if cn.closed.IsSet() { @@ -472,7 +470,7 @@ func (cn *PeerConn) request(r RequestIndex) (more bool, err error) { } cn.validReceiveChunks[r]++ cn.t.requestState[r] = requestState{ - peer: cn.peerPtr(), + peer: cn, when: time.Now(), } cn.updateExpectingChunks() @@ -483,7 +481,7 @@ func (cn *PeerConn) request(r RequestIndex) (more bool, err error) { return cn._request(ppReq), nil } -func (me *Peer) cancel(r RequestIndex) { +func (me *PeerConn) cancel(r RequestIndex) { if !me.deleteRequest(r) { panic("request not existing should have been guarded") } @@ -669,7 +667,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // Cancel pending requests for this chunk from *other* peers. if p := t.requestingPeer(req); p != nil { - if p == c { + if p.peerPtr() == c { p.logger.Slogger().Error("received chunk but still pending request", "peer", p, "req", req) panic("should not be pending request from conn that just received it") } @@ -750,12 +748,12 @@ func (c *Peer) peerHasWantedPieces() bool { // Returns true if an outstanding request is removed. Cancelled requests should be handled // separately. -func (c *Peer) deleteRequest(r RequestIndex) bool { +func (c *PeerConn) deleteRequest(r RequestIndex) bool { if !c.requestState.Requests.CheckedRemove(r) { return false } for _, f := range c.callbacks.DeletedRequest { - f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) + f(PeerRequestEvent{c.peerPtr(), c.t.requestIndexToRequest(r)}) } c.updateExpectingChunks() // TODO: Can't this happen if a request is stolen? @@ -771,7 +769,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool { return true } -func (c *Peer) deleteAllRequests(reason updateRequestReason) { +func (c *PeerConn) deleteAllRequests(reason updateRequestReason) { if c.requestState.Requests.IsEmpty() { return } @@ -789,13 +787,13 @@ func (c *Peer) deleteAllRequests(reason updateRequestReason) { }) } -func (c *Peer) assertNoRequests() { +func (c *PeerConn) assertNoRequests() { if !c.requestState.Requests.IsEmpty() { panic(c.requestState.Requests.GetCardinality()) } } -func (c *Peer) cancelAllRequests() { +func (c *PeerConn) cancelAllRequests() { c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool { c.cancel(x) return true @@ -853,7 +851,7 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { return pc, ok } -func (p *Peer) uncancelledRequests() uint64 { +func (p *PeerConn) uncancelledRequests() uint64 { return p.requestState.Requests.GetCardinality() } diff --git a/peerconn.go b/peerconn.go index 5ccf23ce..a3e0a9d0 100644 --- a/peerconn.go +++ b/peerconn.go @@ -22,6 +22,7 @@ import ( "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" + requestStrategy "github.com/anacrolix/torrent/internal/request-strategy" "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" @@ -95,6 +96,9 @@ type PeerConn struct { // we may not even know the number of pieces in the torrent yet. peerSentHaveAll bool + // TODO: How are pending cancels handled for webseed peers? + requestState requestStrategy.PeerRequestState + peerRequestDataAllocLimiter alloclim.Limiter outstandingHolepunchingRendezvous map[netip.AddrPort]struct{} diff --git a/requesting.go b/requesting.go index 5fc4a4fd..c45a502b 100644 --- a/requesting.go +++ b/requesting.go @@ -83,7 +83,7 @@ type ( type desiredPeerRequests struct { requestIndexes []RequestIndex - peer *Peer + peer *PeerConn pieceStates []g.Option[requestStrategy.PieceRequestOrderState] } @@ -185,7 +185,7 @@ func (t *Torrent) getRequestablePieces(f requestStrategy.RequestPieceFunc) { // This gets the best-case request state. That means handling pieces limited by capacity, preferring // earlier pieces, low availability etc. It pays no attention to existing requests on the peer or // other peers. Those are handled later. -func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { +func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) { t := p.t if !t.haveInfo() { return @@ -360,7 +360,7 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) { } existing := t.requestingPeer(req) - if existing != nil && existing != p.peerPtr() { + if existing != nil && existing != p { // don't steal on cancel - because this is triggered by t.cancelRequest below // which means that the cancelled can immediately try to steal back a request // it has lost which can lead to circular cancel/add processing diff --git a/torrent.go b/torrent.go index 6b2ba6b0..aab918a8 100644 --- a/torrent.go +++ b/torrent.go @@ -1471,18 +1471,21 @@ func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequ // Non-pending pieces are usually cancelled more synchronously. return } - t.iterPeers(func(c *Peer) { - if !c.isLowOnRequests() { - return - } - if !c.peerHasPiece(piece) { - return - } - if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) { - return - } - c.onNeedUpdateRequests(reason) - }) + for c := range t.conns { + // This is a lot of effort to avoid using continue... + func() { + if !c.isLowOnRequests() { + return + } + if !c.peerHasPiece(piece) { + return + } + if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) { + return + } + c.onNeedUpdateRequests(reason) + }() + } } // Stuff we don't want to run when the pending pieces change while benchmarking. @@ -3133,7 +3136,7 @@ func (t *Torrent) updateComplete() { t.complete.SetBool(t.haveAllPieces()) } -func (t *Torrent) cancelRequest(r RequestIndex) *Peer { +func (t *Torrent) cancelRequest(r RequestIndex) *PeerConn { p := t.requestingPeer(r) if p != nil { p.cancel(r) @@ -3146,7 +3149,7 @@ func (t *Torrent) cancelRequest(r RequestIndex) *Peer { return p } -func (t *Torrent) requestingPeer(r RequestIndex) *Peer { +func (t *Torrent) requestingPeer(r RequestIndex) *PeerConn { return t.requestState[r].peer } @@ -3208,7 +3211,7 @@ func (t *Torrent) GetWebRtcPeerConnStats() map[string]webRtcStatsReports { } type requestState struct { - peer *Peer + peer *PeerConn when time.Time } diff --git a/webseed-peer.go b/webseed-peer.go index 56e50e74..1a131f12 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -32,6 +32,14 @@ type webseedPeer struct { hostKey webseedHostKeyHandle } +func (me *webseedPeer) cancelAllRequests() { + // Is there any point to this? Won't we fail to receive a chunk and cancel anyway? Should we + // Close requests instead? + for req := range me.activeRequests { + req.Cancel() + } +} + func (me *webseedPeer) peerImplWriteStatus(w io.Writer) {} func (me *webseedPeer) isLowOnRequests() bool { @@ -211,33 +219,6 @@ func (ws *webseedPeer) deleteActiveRequest(wr *webseedRequest) { ws.peer.updateExpectingChunks() } -func (ws *webseedPeer) iterConsecutiveRequests(begin RequestIndex) iter.Seq[RequestIndex] { - return func(yield func(RequestIndex) bool) { - for { - if !ws.peer.requestState.Requests.Contains(begin) { - return - } - if !yield(begin) { - return - } - begin++ - } - } -} - -func (ws *webseedPeer) iterConsecutiveInactiveRequests(begin RequestIndex) iter.Seq[RequestIndex] { - return func(yield func(RequestIndex) bool) { - for req := range ws.iterConsecutiveRequests(begin) { - if !ws.inactiveRequestIndex(req) { - return - } - if !yield(req) { - return - } - } - } -} - func (ws *webseedPeer) inactiveRequestIndex(index RequestIndex) bool { for range ws.activeRequestsForIndex(index) { return false @@ -245,24 +226,6 @@ func (ws *webseedPeer) inactiveRequestIndex(index RequestIndex) bool { return true } -func (ws *webseedPeer) inactiveRequests() iter.Seq[RequestIndex] { - return func(yield func(RequestIndex) bool) { - // This is used to determine contiguity of requests. - //sorted := slices.Sorted(ws.peer.requestState.Requests.Iterator()) - //if len(sorted) != 0 { - // fmt.Println("inactiveRequests", sorted) - //} - for reqIndex := range ws.peer.requestState.Requests.Iterator() { - if !ws.inactiveRequestIndex(reqIndex) { - continue - } - if !yield(reqIndex) { - return - } - } - } -} - func (ws *webseedPeer) connectionFlags() string { return "WS" } -- 2.51.0