From 3e716d089d1125af0fb8801e53e74bbfe60b30af Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 26 Jun 2025 18:18:45 +1000 Subject: [PATCH] Add checkReceivedChunk per-Peer impl --- peer-impl.go | 1 + peer.go | 31 +++++++++++-------------------- peerconn.go | 17 +++++++++++++++++ requesting.go | 4 ++-- webseed-peer.go | 13 ++++--------- 5 files changed, 35 insertions(+), 31 deletions(-) diff --git a/peer-impl.go b/peer-impl.go index f3e13077..76542352 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -48,4 +48,5 @@ type newHotPeerImpl interface { lastWriteUploadRate() float64 // How many requests should be assigned to the peer. nominalMaxRequests() maxRequests + checkReceivedChunk(ri RequestIndex) error } diff --git a/peer.go b/peer.go index 70459d80..370c3d32 100644 --- a/peer.go +++ b/peer.go @@ -72,14 +72,6 @@ type ( cumulativeExpectedToReceiveChunks time.Duration choking bool - // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering, - // and implementation differences, we may receive chunks that are no longer in the set of - // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable. - validReceiveChunks map[RequestIndex]int - // Indexed by metadata piece, set to true if posted and pending a - // response. - metadataRequests []bool - sentHaves bitmap.Bitmap // Stuff controlled by the remote peer. peerInterested bool @@ -453,7 +445,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error { return nil } -func (cn *Peer) mustRequest(r RequestIndex) bool { +func (cn *PeerConn) mustRequest(r RequestIndex) bool { more, err := cn.request(r) if err != nil { panic(err) @@ -461,7 +453,7 @@ func (cn *Peer) mustRequest(r RequestIndex) bool { return more } -func (cn *Peer) request(r RequestIndex) (more bool, err error) { +func (cn *PeerConn) request(r RequestIndex) (more bool, err error) { if err := cn.shouldRequest(r); err != nil { panic(err) } @@ -477,13 +469,13 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { } cn.validReceiveChunks[r]++ cn.t.requestState[r] = requestState{ - peer: cn, + peer: cn.peerPtr(), when: time.Now(), } cn.updateExpectingChunks() ppReq := cn.t.requestIndexToRequest(r) for _, f := range cn.callbacks.SentRequest { - f(PeerRequestEvent{cn, ppReq}) + f(PeerRequestEvent{cn.peerPtr(), ppReq}) } return cn.legacyPeerImpl._request(ppReq), nil } @@ -589,7 +581,7 @@ func runSafeExtraneous(f func()) { } // Returns true if it was valid to reject the request. -func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { +func (c *PeerConn) remoteRejectedRequest(r RequestIndex) bool { if c.deleteRequest(r) { c.decPeakRequests() } else if !c.requestState.Cancelled.CheckedRemove(r) { @@ -603,7 +595,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { return true } -func (c *Peer) decExpectedChunkReceive(r RequestIndex) { +func (c *PeerConn) decExpectedChunkReceive(r RequestIndex) { count := c.validReceiveChunks[r] if count == 1 { delete(c.validReceiveChunks, r) @@ -636,19 +628,18 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.recordBlockForSmartBan(req, msg.Piece) }) // This needs to occur before we return, but we try to do it when the client is unlocked. It - // can't be done before checking if chunks are valid because they won't be deallocated by piece - // hashing if they're out of bounds. + // can't be done before checking if chunks are valid because they won't be deallocated from the + // smart ban cache by piece hashing if they're out of bounds. defer recordBlockForSmartBan() if c.peerChoking { ChunksReceived.Add("while choked", 1) } - if c.validReceiveChunks[req] <= 0 { - ChunksReceived.Add("unexpected", 1) - return errors.New("received unexpected chunk") + err = c.peerImpl.checkReceivedChunk(req) + if err != nil { + return err } - c.decExpectedChunkReceive(req) if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) { ChunksReceived.Add("due to allowed fast", 1) diff --git a/peerconn.go b/peerconn.go index d72e00f3..340196f1 100644 --- a/peerconn.go +++ b/peerconn.go @@ -43,6 +43,14 @@ type PeerStatus struct { type PeerConn struct { Peer + // Indexed by metadata piece, set to true if posted and pending a response. + metadataRequests []bool + sentHaves bitmap.Bitmap + // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering, + // and implementation differences, we may receive chunks that are no longer in the set of + // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable. + validReceiveChunks map[RequestIndex]int + // Move to PeerConn? protocolLogger log.Logger @@ -1468,3 +1476,12 @@ func (me *PeerConn) setPeerLoggers(a log.Logger, s *slog.Logger) { me.Peer.slogger = s.With(fmt.Sprintf("%T", me), fmt.Sprintf("%p", me)) me.protocolLogger = me.logger.WithNames(protocolLoggingName) } + +func (c *PeerConn) checkReceivedChunk(req RequestIndex) error { + if c.validReceiveChunks[req] <= 0 { + ChunksReceived.Add("unexpected", 1) + return errors.New("received unexpected chunk") + } + c.decExpectedChunkReceive(req) + return nil +} diff --git a/requesting.go b/requesting.go index 26e56e45..a8c6fe3c 100644 --- a/requesting.go +++ b/requesting.go @@ -316,7 +316,7 @@ func (p *Peer) allowSendNotInterested() bool { // Transmit/action the request state to the peer. This includes work-stealing from other peers and // some piece order randomization within the preferred state calculated earlier in next. Cancels are // not done here, those are handled synchronously. We only track pending cancel acknowledgements. -func (p *Peer) applyRequestState(next desiredRequestState) { +func (p *PeerConn) applyRequestState(next desiredRequestState) { current := &p.requestState // Make interest sticky if !next.Interested && p.requestState.Interested { @@ -359,7 +359,7 @@ func (p *Peer) applyRequestState(next desiredRequestState) { } existing := t.requestingPeer(req) - if existing != nil && existing != p { + if existing != nil && existing != p.peerPtr() { // don't steal on cancel - because this is triggered by t.cancelRequest below // which means that the cancelled can immediately try to steal back a request // it has lost which can lead to circular cancel/add processing diff --git a/webseed-peer.go b/webseed-peer.go index 0ad53d8c..ed49edec 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -29,6 +29,10 @@ type webseedPeer struct { hostKey webseedHostKeyHandle } +func (me *webseedPeer) checkReceivedChunk(ri RequestIndex) error { + return nil +} + func (me *webseedPeer) nominalMaxRequests() maxRequests { // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now // we just allow 2 MiB worth of requests. @@ -54,15 +58,6 @@ func (me *webseedPeer) moreRequestsAllowed() bool { func (me *webseedPeer) updateRequests() { return - if !me.shouldUpdateRequests() { - return - } - p := &me.peer - next := p.getDesiredRequestState() - p.applyRequestState(next) - p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes) - // Run this after all requests applied to the peer, so they can be batched up. - me.spawnRequests() } func (me *webseedPeer) lastWriteUploadRate() float64 { -- 2.51.0