]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add checkReceivedChunk per-Peer impl
authorMatt Joiner <anacrolix@gmail.com>
Thu, 26 Jun 2025 08:18:45 +0000 (18:18 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 26 Jun 2025 12:51:07 +0000 (22:51 +1000)
peer-impl.go
peer.go
peerconn.go
requesting.go
webseed-peer.go

index f3e13077fb0b1d6da4678fbd099c2a55dd7d4eff..76542352724fc987249a46a60cb02ec44739f0e7 100644 (file)
@@ -48,4 +48,5 @@ type newHotPeerImpl interface {
        lastWriteUploadRate() float64
        // How many requests should be assigned to the peer.
        nominalMaxRequests() maxRequests
+       checkReceivedChunk(ri RequestIndex) error
 }
diff --git a/peer.go b/peer.go
index 70459d805374a70f17e48892fb88fd978cb43a02..370c3d32fa2295f604f9baa05e7693f6afdd3794 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -72,14 +72,6 @@ type (
                cumulativeExpectedToReceiveChunks   time.Duration
 
                choking bool
-               // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
-               // and implementation differences, we may receive chunks that are no longer in the set of
-               // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
-               validReceiveChunks map[RequestIndex]int
-               // Indexed by metadata piece, set to true if posted and pending a
-               // response.
-               metadataRequests []bool
-               sentHaves        bitmap.Bitmap
 
                // Stuff controlled by the remote peer.
                peerInterested        bool
@@ -453,7 +445,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error {
        return nil
 }
 
-func (cn *Peer) mustRequest(r RequestIndex) bool {
+func (cn *PeerConn) mustRequest(r RequestIndex) bool {
        more, err := cn.request(r)
        if err != nil {
                panic(err)
@@ -461,7 +453,7 @@ func (cn *Peer) mustRequest(r RequestIndex) bool {
        return more
 }
 
-func (cn *Peer) request(r RequestIndex) (more bool, err error) {
+func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
        if err := cn.shouldRequest(r); err != nil {
                panic(err)
        }
@@ -477,13 +469,13 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
        }
        cn.validReceiveChunks[r]++
        cn.t.requestState[r] = requestState{
-               peer: cn,
+               peer: cn.peerPtr(),
                when: time.Now(),
        }
        cn.updateExpectingChunks()
        ppReq := cn.t.requestIndexToRequest(r)
        for _, f := range cn.callbacks.SentRequest {
-               f(PeerRequestEvent{cn, ppReq})
+               f(PeerRequestEvent{cn.peerPtr(), ppReq})
        }
        return cn.legacyPeerImpl._request(ppReq), nil
 }
@@ -589,7 +581,7 @@ func runSafeExtraneous(f func()) {
 }
 
 // Returns true if it was valid to reject the request.
-func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
+func (c *PeerConn) remoteRejectedRequest(r RequestIndex) bool {
        if c.deleteRequest(r) {
                c.decPeakRequests()
        } else if !c.requestState.Cancelled.CheckedRemove(r) {
@@ -603,7 +595,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
        return true
 }
 
-func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
+func (c *PeerConn) decExpectedChunkReceive(r RequestIndex) {
        count := c.validReceiveChunks[r]
        if count == 1 {
                delete(c.validReceiveChunks, r)
@@ -636,19 +628,18 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                c.recordBlockForSmartBan(req, msg.Piece)
        })
        // This needs to occur before we return, but we try to do it when the client is unlocked. It
-       // can't be done before checking if chunks are valid because they won't be deallocated by piece
-       // hashing if they're out of bounds.
+       // can't be done before checking if chunks are valid because they won't be deallocated from the
+       // smart ban cache by piece hashing if they're out of bounds.
        defer recordBlockForSmartBan()
 
        if c.peerChoking {
                ChunksReceived.Add("while choked", 1)
        }
 
-       if c.validReceiveChunks[req] <= 0 {
-               ChunksReceived.Add("unexpected", 1)
-               return errors.New("received unexpected chunk")
+       err = c.peerImpl.checkReceivedChunk(req)
+       if err != nil {
+               return err
        }
-       c.decExpectedChunkReceive(req)
 
        if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) {
                ChunksReceived.Add("due to allowed fast", 1)
index d72e00f3c5d64fb74434c7bf2cb340498b18d85a..340196f14cdf139b66a89bdf47b9f4c223c768f4 100644 (file)
@@ -43,6 +43,14 @@ type PeerStatus struct {
 type PeerConn struct {
        Peer
 
+       // Indexed by metadata piece, set to true if posted and pending a response.
+       metadataRequests []bool
+       sentHaves        bitmap.Bitmap
+       // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
+       // and implementation differences, we may receive chunks that are no longer in the set of
+       // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
+       validReceiveChunks map[RequestIndex]int
+
        // Move to PeerConn?
        protocolLogger log.Logger
 
@@ -1468,3 +1476,12 @@ func (me *PeerConn) setPeerLoggers(a log.Logger, s *slog.Logger) {
        me.Peer.slogger = s.With(fmt.Sprintf("%T", me), fmt.Sprintf("%p", me))
        me.protocolLogger = me.logger.WithNames(protocolLoggingName)
 }
+
+func (c *PeerConn) checkReceivedChunk(req RequestIndex) error {
+       if c.validReceiveChunks[req] <= 0 {
+               ChunksReceived.Add("unexpected", 1)
+               return errors.New("received unexpected chunk")
+       }
+       c.decExpectedChunkReceive(req)
+       return nil
+}
index 26e56e4572e7cb19b2246fee957d951ee2efe9df..a8c6fe3cecde958dbfcbb833fddd3d6aee1efe86 100644 (file)
@@ -316,7 +316,7 @@ func (p *Peer) allowSendNotInterested() bool {
 // Transmit/action the request state to the peer. This includes work-stealing from other peers and
 // some piece order randomization within the preferred state calculated earlier in next. Cancels are
 // not done here, those are handled synchronously. We only track pending cancel acknowledgements.
-func (p *Peer) applyRequestState(next desiredRequestState) {
+func (p *PeerConn) applyRequestState(next desiredRequestState) {
        current := &p.requestState
        // Make interest sticky
        if !next.Interested && p.requestState.Interested {
@@ -359,7 +359,7 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
                }
 
                existing := t.requestingPeer(req)
-               if existing != nil && existing != p {
+               if existing != nil && existing != p.peerPtr() {
                        // don't steal on cancel - because this is triggered by t.cancelRequest below
                        // which means that the cancelled can immediately try to steal back a request
                        // it has lost which can lead to circular cancel/add processing
index 0ad53d8c8bfafe2b5fcca7f1f79d2632da212d69..ed49edecbfb4650efa6d5ab0890026fd52e76c20 100644 (file)
@@ -29,6 +29,10 @@ type webseedPeer struct {
        hostKey          webseedHostKeyHandle
 }
 
+func (me *webseedPeer) checkReceivedChunk(ri RequestIndex) error {
+       return nil
+}
+
 func (me *webseedPeer) nominalMaxRequests() maxRequests {
        // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
        // we just allow 2 MiB worth of requests.
@@ -54,15 +58,6 @@ func (me *webseedPeer) moreRequestsAllowed() bool {
 
 func (me *webseedPeer) updateRequests() {
        return
-       if !me.shouldUpdateRequests() {
-               return
-       }
-       p := &me.peer
-       next := p.getDesiredRequestState()
-       p.applyRequestState(next)
-       p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
-       // Run this after all requests applied to the peer, so they can be batched up.
-       me.spawnRequests()
 }
 
 func (me *webseedPeer) lastWriteUploadRate() float64 {