]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rename request updating stuff to better fit how it's actually used in preparation...
authorMatt Joiner <anacrolix@gmail.com>
Mon, 12 May 2025 23:26:39 +0000 (09:26 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 12 May 2025 23:26:39 +0000 (09:26 +1000)
client.go
peer-impl.go
peer.go
peerconn.go
request-strategy/order.go
requesting.go
torrent.go
webseed-peer.go

index fbd8a0d1ae41874f909a27eaa2a6844518b1d94e..f950c15a1f814f16f1710c1e4e616cf313bb897d 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1197,7 +1197,7 @@ func (c *Peer) updateRequestsTimerFunc() {
                torrent.Add("spurious timer requests updates", 1)
                return
        }
-       c.updateRequests(peerUpdateRequestsTimerReason)
+       c.onNeedUpdateRequests(peerUpdateRequestsTimerReason)
 }
 
 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
index 391bce681c477b4900ab87c1fb6d861cf238121c..5a348fb157028a0b9e9539a443b5529d37676926 100644 (file)
@@ -13,8 +13,10 @@ import (
 // newHotPeerImpl.
 type legacyPeerImpl interface {
        // Trigger the actual request state to get updated
-       handleUpdateRequests()
+       handleOnNeedUpdateRequests()
        writeInterested(interested bool) bool
+       // Actually go ahead and modify the pending requests.
+       updateRequests()
 
        // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
        // handled by a follow-up event.
diff --git a/peer.go b/peer.go
index 7a9395314ecbd48fd6cab1286b9b9500f0e0020c..b32bef139e59bdb2cc3632e6a7b9c8cad7216ee9 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -458,7 +458,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error {
        return nil
 }
 
-func (cn *Peer) mustRequest(r RequestIndex) bool {
+func (cn *PeerConn) mustRequest(r RequestIndex) bool {
        more, err := cn.request(r)
        if err != nil {
                panic(err)
@@ -466,7 +466,7 @@ func (cn *Peer) mustRequest(r RequestIndex) bool {
        return more
 }
 
-func (cn *Peer) request(r RequestIndex) (more bool, err error) {
+func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
        if err := cn.shouldRequest(r); err != nil {
                panic(err)
        }
@@ -482,13 +482,13 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
        }
        cn.validReceiveChunks[r]++
        cn.t.requestState[r] = requestState{
-               peer: cn,
+               peer: &cn.Peer,
                when: time.Now(),
        }
        cn.updateExpectingChunks()
        ppReq := cn.t.requestIndexToRequest(r)
        for _, f := range cn.callbacks.SentRequest {
-               f(PeerRequestEvent{cn, ppReq})
+               f(PeerRequestEvent{&cn.Peer, ppReq})
        }
        return cn.legacyPeerImpl._request(ppReq), nil
 }
@@ -505,17 +505,17 @@ func (me *Peer) cancel(r RequestIndex) {
        }
        me.decPeakRequests()
        if me.isLowOnRequests() {
-               me.updateRequests(peerUpdateRequestsPeerCancelReason)
+               me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
        }
 }
 
 // Sets a reason to update requests, and if there wasn't already one, handle it.
-func (cn *Peer) updateRequests(reason updateRequestReason) {
+func (cn *Peer) onNeedUpdateRequests(reason updateRequestReason) {
        if cn.needRequestUpdate != "" {
                return
        }
        cn.needRequestUpdate = reason
-       cn.handleUpdateRequests()
+       cn.handleOnNeedUpdateRequests()
 }
 
 // Emits the indices in the Bitmaps bms in order, never repeating any index.
@@ -599,7 +599,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
                return false
        }
        if c.isLowOnRequests() {
-               c.updateRequests(peerUpdateRequestsRemoteRejectReason)
+               c.onNeedUpdateRequests(peerUpdateRequestsRemoteRejectReason)
        }
        c.decExpectedChunkReceive(r)
        return true
@@ -672,7 +672,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                                c._chunksReceivedWhileExpecting++
                        }
                        if c.isLowOnRequests() {
-                               c.updateRequests("Peer.receiveChunk deleted request")
+                               c.onNeedUpdateRequests("Peer.receiveChunk deleted request")
                        }
                } else {
                        ChunksReceived.Add("unintended", 1)
@@ -740,7 +740,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a
                // request update runs while we're writing the chunk that just failed. Then we never do a
                // fresh update after pending the failed request.
-               c.updateRequests("Peer.receiveChunk error writing chunk")
+               c.onNeedUpdateRequests("Peer.receiveChunk error writing chunk")
                t.onWriteChunkErr(err)
                return nil
        }
@@ -805,7 +805,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
        delete(c.t.requestState, r)
        // c.t.iterPeers(func(p *Peer) {
        //      if p.isLowOnRequests() {
-       //              p.updateRequests("Peer.deleteRequest")
+       //              p.onNeedUpdateRequests("Peer.deleteRequest")
        //      }
        // })
        return true
@@ -824,7 +824,7 @@ func (c *Peer) deleteAllRequests(reason updateRequestReason) {
        c.assertNoRequests()
        c.t.iterPeers(func(p *Peer) {
                if p.isLowOnRequests() {
-                       p.updateRequests(reason)
+                       p.onNeedUpdateRequests(reason)
                }
        })
 }
index 27fa820f4e0f5b7b715730522fb23a3fd5adc5bb..94ec1f4e085090974bdc98a83ab1478a6f7b44e7 100644 (file)
@@ -391,7 +391,7 @@ func (cn *PeerConn) postBitfield() {
        cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()}
 }
 
-func (cn *PeerConn) handleUpdateRequests() {
+func (cn *PeerConn) handleOnNeedUpdateRequests() {
        // The writer determines the request state as needed when it can write.
        cn.tickleWriter()
 }
@@ -415,7 +415,7 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error {
        }
        cn._peerPieces.Add(uint32(piece))
        if cn.t.wantPieceIndex(piece) {
-               cn.updateRequests("have")
+               cn.onNeedUpdateRequests("have")
        }
        cn.peerPiecesChanged()
        return nil
@@ -471,7 +471,7 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error {
        // as or.
        cn._peerPieces.Xor(&bm)
        if shouldUpdateRequests {
-               cn.updateRequests("bitfield")
+               cn.onNeedUpdateRequests("bitfield")
        }
        // We didn't guard this before, I see no reason to do it now.
        cn.peerPiecesChanged()
@@ -498,7 +498,7 @@ func (cn *PeerConn) onPeerHasAllPieces() {
 
 func (cn *PeerConn) peerHasAllPiecesTriggers() {
        if !cn.t._pendingPieces.IsEmpty() {
-               cn.updateRequests("Peer.onPeerHasAllPieces")
+               cn.onNeedUpdateRequests("Peer.onPeerHasAllPieces")
        }
        cn.peerPiecesChanged()
 }
@@ -825,7 +825,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                                torrent.Add("requestsPreservedThroughChoking", int64(preservedCount))
                        }
                        if !c.t._pendingPieces.IsEmpty() {
-                               c.updateRequests("unchoked")
+                               c.onNeedUpdateRequests("unchoked")
                        }
                        c.updateExpectingChunks()
                case pp.Interested:
@@ -876,7 +876,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.Suggest:
                        torrent.Add("suggests received", 1)
                        log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).LogLevel(log.Debug, c.t.logger)
-                       c.updateRequests("suggested")
+                       c.onNeedUpdateRequests("suggested")
                case pp.HaveAll:
                        err = c.onPeerSentHaveAll()
                case pp.HaveNone:
@@ -890,7 +890,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
                        log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).LogLevel(log.Debug, c.t.logger)
-                       c.updateRequests("PeerConn.mainReadLoop allowed fast")
+                       c.onNeedUpdateRequests("PeerConn.mainReadLoop allowed fast")
                case pp.Extended:
                        err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
                case pp.Hashes:
index 6aa1b402bd4bfe9d216ec7335bf339e5fe5fa773..b0686aff470184bc7e99de6800e4d5ca61a1860d 100644 (file)
@@ -42,7 +42,8 @@ func pieceOrderLess(i, j *PieceRequestOrderItem) multiless.Computation {
 // Calls f with requestable pieces in order.
 func GetRequestablePieces(
        input Input, pro *PieceRequestOrder,
-       // Returns true if the piece should be considered against the unverified bytes limit.
+       // Returns true if the piece should be considered against the unverified bytes limit. This is
+       // based on whether the callee intends to request from the piece.
        requestPiece func(ih metainfo.Hash, pieceIndex int, orderState PieceRequestOrderState) bool,
 ) {
        // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
index 7d32012edd231d12ee20aec20e7b8df58ef0f24d..bedb07be8e4ade6ba4196001c8b933d8a471cb37 100644 (file)
@@ -168,7 +168,10 @@ type desiredRequestState struct {
        Interested bool
 }
 
-func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
+// This gets the best-case request state. That means handling pieces limited by capacity, preferring
+// earlier pieces, low availability etc. It pays no attention to existing requests on the peer or
+// other peers. Those are handled later.
+func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) {
        t := p.t
        if !t.haveInfo() {
                return
@@ -181,7 +184,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
        }
        input := t.getRequestStrategyInput()
        requestHeap := desiredPeerRequests{
-               peer:           p,
+               peer:           &p.Peer,
                pieceStates:    t.requestPieceStates,
                requestIndexes: t.requestIndexes,
        }
@@ -249,11 +252,20 @@ func (p *Peer) maybeUpdateActualRequestState() {
                context.Background(),
                pprof.Labels("update request", string(p.needRequestUpdate)),
                func(_ context.Context) {
-                       next := p.getDesiredRequestState()
-                       p.applyRequestState(next)
-                       p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
+                       p.updateRequests()
                },
        )
+       p.needRequestUpdate = ""
+       p.lastRequestUpdate = time.Now()
+       if enableUpdateRequestsTimer {
+               p.updateRequestsTimer.Reset(updateRequestsTimerDuration)
+       }
+}
+
+func (p *PeerConn) updateRequests() {
+       next := p.getDesiredRequestState()
+       p.applyRequestState(next)
+       p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
 }
 
 func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex) {
@@ -280,8 +292,9 @@ func (p *Peer) allowSendNotInterested() bool {
        return roaring.AndNot(p.peerPieces(), &p.t._completedPieces).IsEmpty()
 }
 
-// Transmit/action the request state to the peer.
-func (p *Peer) applyRequestState(next desiredRequestState) {
+// Transmit/action the request state to the peer. This includes work-stealing from other peers and
+// some piece order randomization within the preferred state calculated earlier in next.
+func (p *PeerConn) applyRequestState(next desiredRequestState) {
        current := &p.requestState
        // Make interest sticky
        if !next.Interested && p.requestState.Interested {
@@ -324,7 +337,7 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
                }
 
                existing := t.requestingPeer(req)
-               if existing != nil && existing != p {
+               if existing != nil && existing != &p.Peer {
                        // don't steal on cancel - because this is triggered by t.cancelRequest below
                        // which means that the cancelled can immediately try to steal back a request
                        // it has lost which can lead to circular cancel/add processing
@@ -359,11 +372,6 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
        //      "requests %v->%v (peak %v->%v) reason %q (peer %v)",
        //      originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p)
        p.peakRequests = newPeakRequests
-       p.needRequestUpdate = ""
-       p.lastRequestUpdate = time.Now()
-       if enableUpdateRequestsTimer {
-               p.updateRequestsTimer.Reset(updateRequestsTimerDuration)
-       }
 }
 
 // This could be set to 10s to match the unchoke/request update interval recommended by some
index c8775d7ac3cf4c13ff1a0c2668a19d2cc4c7c7bd..eb32e6541157102ea2ab072c19eabc0c29fe6683 100644 (file)
@@ -575,7 +575,7 @@ func (t *Torrent) onSetInfo() {
        t.tryCreateMorePieceHashers()
        t.iterPeers(func(p *Peer) {
                p.onGotInfo(t.info)
-               p.updateRequests("onSetInfo")
+               p.onNeedUpdateRequests("onSetInfo")
        })
 }
 
@@ -1464,7 +1464,7 @@ func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequ
                if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
                        return
                }
-               c.updateRequests(reason)
+               c.onNeedUpdateRequests(reason)
        })
 }
 
@@ -2597,7 +2597,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
        // }
        t.iterPeers(func(conn *Peer) {
                if conn.peerHasPiece(piece) {
-                       conn.updateRequests("piece incomplete")
+                       conn.onNeedUpdateRequests("piece incomplete")
                }
        })
 }
@@ -2883,7 +2883,7 @@ func (t *Torrent) disallowDataDownloadLocked() {
        t.dataDownloadDisallowed.Set()
        t.iterPeers(func(p *Peer) {
                // Could check if peer request state is empty/not interested?
-               p.updateRequests("disallow data download")
+               p.onNeedUpdateRequests("disallow data download")
                p.cancelAllRequests()
        })
 }
@@ -2893,7 +2893,7 @@ func (t *Torrent) AllowDataDownload() {
        defer t.cl.unlock()
        t.dataDownloadDisallowed.Clear()
        t.iterPeers(func(p *Peer) {
-               p.updateRequests("allow data download")
+               p.onNeedUpdateRequests("allow data download")
        })
 }
 
@@ -2903,7 +2903,7 @@ func (t *Torrent) AllowDataUpload() {
        defer t.cl.unlock()
        t.dataUploadDisallowed = false
        t.iterPeers(func(p *Peer) {
-               p.updateRequests("allow data upload")
+               p.onNeedUpdateRequests("allow data upload")
        })
 }
 
@@ -2914,7 +2914,7 @@ func (t *Torrent) DisallowDataUpload() {
        t.dataUploadDisallowed = true
        for c := range t.conns {
                // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
-               c.updateRequests("disallow data upload")
+               c.onNeedUpdateRequests("disallow data upload")
        }
 }
 
@@ -3022,7 +3022,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool {
                ws.onGotInfo(t.info)
        }
        t.webSeeds[url] = &ws.peer
-       ws.peer.updateRequests("Torrent.addWebSeed")
+       ws.peer.onNeedUpdateRequests("Torrent.addWebSeed")
        return true
 }
 
index 251d6fe94196bb5569bfa0e9bf7ad0e074a21fc0..19a055a64e0ae48e8fa775ec255eb7431186fc1f 100644 (file)
@@ -30,6 +30,11 @@ type webseedPeer struct {
        lastUnhandledErr time.Time
 }
 
+func (me *webseedPeer) updateRequests() {
+       //TODO implement me
+       panic("implement me")
+}
+
 func (me *webseedPeer) lastWriteUploadRate() float64 {
        // We never upload to webseeds.
        return 0
@@ -108,7 +113,7 @@ func (ws *webseedPeer) doRequest(r Request, webseedRequest webseed.Request) {
        // Delete this entry after waiting above on an error, to prevent more requests.
        delete(ws.activeRequests, r)
        if err != nil {
-               ws.peer.updateRequests("webseedPeer request errored")
+               ws.peer.onNeedUpdateRequests("webseedPeer request errored")
        }
        ws.spawnRequests()
        locker.Unlock()
@@ -151,7 +156,7 @@ func (cn *webseedPeer) ban() {
        cn.peer.close()
 }
 
-func (ws *webseedPeer) handleUpdateRequests() {
+func (ws *webseedPeer) handleOnNeedUpdateRequests() {
        // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
        // pieces.
        go func() {
@@ -167,7 +172,7 @@ func (ws *webseedPeer) onClose() {
        ws.peer.cancelAllRequests()
        ws.peer.t.iterPeers(func(p *Peer) {
                if p.isLowOnRequests() {
-                       p.updateRequests("webseedPeer.onClose")
+                       p.onNeedUpdateRequests("webseedPeer.onClose")
                }
        })
 }