From 18c9e6e2e890a6866f88b43edb84c6aae24c97ce Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 13 May 2025 09:26:39 +1000 Subject: [PATCH] Rename request updating stuff to better fit how it's actually used in preparation for redoing webseeds --- client.go | 2 +- peer-impl.go | 4 +++- peer.go | 24 ++++++++++++------------ peerconn.go | 14 +++++++------- request-strategy/order.go | 3 ++- requesting.go | 34 +++++++++++++++++++++------------- torrent.go | 16 ++++++++-------- webseed-peer.go | 11 ++++++++--- 8 files changed, 62 insertions(+), 46 deletions(-) diff --git a/client.go b/client.go index fbd8a0d1..f950c15a 100644 --- a/client.go +++ b/client.go @@ -1197,7 +1197,7 @@ func (c *Peer) updateRequestsTimerFunc() { torrent.Add("spurious timer requests updates", 1) return } - c.updateRequests(peerUpdateRequestsTimerReason) + c.onNeedUpdateRequests(peerUpdateRequestsTimerReason) } // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this diff --git a/peer-impl.go b/peer-impl.go index 391bce68..5a348fb1 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -13,8 +13,10 @@ import ( // newHotPeerImpl. type legacyPeerImpl interface { // Trigger the actual request state to get updated - handleUpdateRequests() + handleOnNeedUpdateRequests() writeInterested(interested bool) bool + // Actually go ahead and modify the pending requests. + updateRequests() // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be // handled by a follow-up event. diff --git a/peer.go b/peer.go index 7a939531..b32bef13 100644 --- a/peer.go +++ b/peer.go @@ -458,7 +458,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error { return nil } -func (cn *Peer) mustRequest(r RequestIndex) bool { +func (cn *PeerConn) mustRequest(r RequestIndex) bool { more, err := cn.request(r) if err != nil { panic(err) @@ -466,7 +466,7 @@ func (cn *Peer) mustRequest(r RequestIndex) bool { return more } -func (cn *Peer) request(r RequestIndex) (more bool, err error) { +func (cn *PeerConn) request(r RequestIndex) (more bool, err error) { if err := cn.shouldRequest(r); err != nil { panic(err) } @@ -482,13 +482,13 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { } cn.validReceiveChunks[r]++ cn.t.requestState[r] = requestState{ - peer: cn, + peer: &cn.Peer, when: time.Now(), } cn.updateExpectingChunks() ppReq := cn.t.requestIndexToRequest(r) for _, f := range cn.callbacks.SentRequest { - f(PeerRequestEvent{cn, ppReq}) + f(PeerRequestEvent{&cn.Peer, ppReq}) } return cn.legacyPeerImpl._request(ppReq), nil } @@ -505,17 +505,17 @@ func (me *Peer) cancel(r RequestIndex) { } me.decPeakRequests() if me.isLowOnRequests() { - me.updateRequests(peerUpdateRequestsPeerCancelReason) + me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason) } } // Sets a reason to update requests, and if there wasn't already one, handle it. -func (cn *Peer) updateRequests(reason updateRequestReason) { +func (cn *Peer) onNeedUpdateRequests(reason updateRequestReason) { if cn.needRequestUpdate != "" { return } cn.needRequestUpdate = reason - cn.handleUpdateRequests() + cn.handleOnNeedUpdateRequests() } // Emits the indices in the Bitmaps bms in order, never repeating any index. @@ -599,7 +599,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { return false } if c.isLowOnRequests() { - c.updateRequests(peerUpdateRequestsRemoteRejectReason) + c.onNeedUpdateRequests(peerUpdateRequestsRemoteRejectReason) } c.decExpectedChunkReceive(r) return true @@ -672,7 +672,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c._chunksReceivedWhileExpecting++ } if c.isLowOnRequests() { - c.updateRequests("Peer.receiveChunk deleted request") + c.onNeedUpdateRequests("Peer.receiveChunk deleted request") } } else { ChunksReceived.Add("unintended", 1) @@ -740,7 +740,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a // request update runs while we're writing the chunk that just failed. Then we never do a // fresh update after pending the failed request. - c.updateRequests("Peer.receiveChunk error writing chunk") + c.onNeedUpdateRequests("Peer.receiveChunk error writing chunk") t.onWriteChunkErr(err) return nil } @@ -805,7 +805,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool { delete(c.t.requestState, r) // c.t.iterPeers(func(p *Peer) { // if p.isLowOnRequests() { - // p.updateRequests("Peer.deleteRequest") + // p.onNeedUpdateRequests("Peer.deleteRequest") // } // }) return true @@ -824,7 +824,7 @@ func (c *Peer) deleteAllRequests(reason updateRequestReason) { c.assertNoRequests() c.t.iterPeers(func(p *Peer) { if p.isLowOnRequests() { - p.updateRequests(reason) + p.onNeedUpdateRequests(reason) } }) } diff --git a/peerconn.go b/peerconn.go index 27fa820f..94ec1f4e 100644 --- a/peerconn.go +++ b/peerconn.go @@ -391,7 +391,7 @@ func (cn *PeerConn) postBitfield() { cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()} } -func (cn *PeerConn) handleUpdateRequests() { +func (cn *PeerConn) handleOnNeedUpdateRequests() { // The writer determines the request state as needed when it can write. cn.tickleWriter() } @@ -415,7 +415,7 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error { } cn._peerPieces.Add(uint32(piece)) if cn.t.wantPieceIndex(piece) { - cn.updateRequests("have") + cn.onNeedUpdateRequests("have") } cn.peerPiecesChanged() return nil @@ -471,7 +471,7 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error { // as or. cn._peerPieces.Xor(&bm) if shouldUpdateRequests { - cn.updateRequests("bitfield") + cn.onNeedUpdateRequests("bitfield") } // We didn't guard this before, I see no reason to do it now. cn.peerPiecesChanged() @@ -498,7 +498,7 @@ func (cn *PeerConn) onPeerHasAllPieces() { func (cn *PeerConn) peerHasAllPiecesTriggers() { if !cn.t._pendingPieces.IsEmpty() { - cn.updateRequests("Peer.onPeerHasAllPieces") + cn.onNeedUpdateRequests("Peer.onPeerHasAllPieces") } cn.peerPiecesChanged() } @@ -825,7 +825,7 @@ func (c *PeerConn) mainReadLoop() (err error) { torrent.Add("requestsPreservedThroughChoking", int64(preservedCount)) } if !c.t._pendingPieces.IsEmpty() { - c.updateRequests("unchoked") + c.onNeedUpdateRequests("unchoked") } c.updateExpectingChunks() case pp.Interested: @@ -876,7 +876,7 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.Suggest: torrent.Add("suggests received", 1) log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).LogLevel(log.Debug, c.t.logger) - c.updateRequests("suggested") + c.onNeedUpdateRequests("suggested") case pp.HaveAll: err = c.onPeerSentHaveAll() case pp.HaveNone: @@ -890,7 +890,7 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.AllowedFast: torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).LogLevel(log.Debug, c.t.logger) - c.updateRequests("PeerConn.mainReadLoop allowed fast") + c.onNeedUpdateRequests("PeerConn.mainReadLoop allowed fast") case pp.Extended: err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) case pp.Hashes: diff --git a/request-strategy/order.go b/request-strategy/order.go index 6aa1b402..b0686aff 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -42,7 +42,8 @@ func pieceOrderLess(i, j *PieceRequestOrderItem) multiless.Computation { // Calls f with requestable pieces in order. func GetRequestablePieces( input Input, pro *PieceRequestOrder, - // Returns true if the piece should be considered against the unverified bytes limit. + // Returns true if the piece should be considered against the unverified bytes limit. This is + // based on whether the callee intends to request from the piece. requestPiece func(ih metainfo.Hash, pieceIndex int, orderState PieceRequestOrderState) bool, ) { // Storage capacity left for this run, keyed by the storage capacity pointer on the storage diff --git a/requesting.go b/requesting.go index 7d32012e..bedb07be 100644 --- a/requesting.go +++ b/requesting.go @@ -168,7 +168,10 @@ type desiredRequestState struct { Interested bool } -func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { +// This gets the best-case request state. That means handling pieces limited by capacity, preferring +// earlier pieces, low availability etc. It pays no attention to existing requests on the peer or +// other peers. Those are handled later. +func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) { t := p.t if !t.haveInfo() { return @@ -181,7 +184,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { } input := t.getRequestStrategyInput() requestHeap := desiredPeerRequests{ - peer: p, + peer: &p.Peer, pieceStates: t.requestPieceStates, requestIndexes: t.requestIndexes, } @@ -249,11 +252,20 @@ func (p *Peer) maybeUpdateActualRequestState() { context.Background(), pprof.Labels("update request", string(p.needRequestUpdate)), func(_ context.Context) { - next := p.getDesiredRequestState() - p.applyRequestState(next) - p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes) + p.updateRequests() }, ) + p.needRequestUpdate = "" + p.lastRequestUpdate = time.Now() + if enableUpdateRequestsTimer { + p.updateRequestsTimer.Reset(updateRequestsTimerDuration) + } +} + +func (p *PeerConn) updateRequests() { + next := p.getDesiredRequestState() + p.applyRequestState(next) + p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes) } func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex) { @@ -280,8 +292,9 @@ func (p *Peer) allowSendNotInterested() bool { return roaring.AndNot(p.peerPieces(), &p.t._completedPieces).IsEmpty() } -// Transmit/action the request state to the peer. -func (p *Peer) applyRequestState(next desiredRequestState) { +// Transmit/action the request state to the peer. This includes work-stealing from other peers and +// some piece order randomization within the preferred state calculated earlier in next. +func (p *PeerConn) applyRequestState(next desiredRequestState) { current := &p.requestState // Make interest sticky if !next.Interested && p.requestState.Interested { @@ -324,7 +337,7 @@ func (p *Peer) applyRequestState(next desiredRequestState) { } existing := t.requestingPeer(req) - if existing != nil && existing != p { + if existing != nil && existing != &p.Peer { // don't steal on cancel - because this is triggered by t.cancelRequest below // which means that the cancelled can immediately try to steal back a request // it has lost which can lead to circular cancel/add processing @@ -359,11 +372,6 @@ func (p *Peer) applyRequestState(next desiredRequestState) { // "requests %v->%v (peak %v->%v) reason %q (peer %v)", // originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p) p.peakRequests = newPeakRequests - p.needRequestUpdate = "" - p.lastRequestUpdate = time.Now() - if enableUpdateRequestsTimer { - p.updateRequestsTimer.Reset(updateRequestsTimerDuration) - } } // This could be set to 10s to match the unchoke/request update interval recommended by some diff --git a/torrent.go b/torrent.go index c8775d7a..eb32e654 100644 --- a/torrent.go +++ b/torrent.go @@ -575,7 +575,7 @@ func (t *Torrent) onSetInfo() { t.tryCreateMorePieceHashers() t.iterPeers(func(p *Peer) { p.onGotInfo(t.info) - p.updateRequests("onSetInfo") + p.onNeedUpdateRequests("onSetInfo") }) } @@ -1464,7 +1464,7 @@ func (t *Torrent) updatePeerRequestsForPiece(piece pieceIndex, reason updateRequ if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) { return } - c.updateRequests(reason) + c.onNeedUpdateRequests(reason) }) } @@ -2597,7 +2597,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { // } t.iterPeers(func(conn *Peer) { if conn.peerHasPiece(piece) { - conn.updateRequests("piece incomplete") + conn.onNeedUpdateRequests("piece incomplete") } }) } @@ -2883,7 +2883,7 @@ func (t *Torrent) disallowDataDownloadLocked() { t.dataDownloadDisallowed.Set() t.iterPeers(func(p *Peer) { // Could check if peer request state is empty/not interested? - p.updateRequests("disallow data download") + p.onNeedUpdateRequests("disallow data download") p.cancelAllRequests() }) } @@ -2893,7 +2893,7 @@ func (t *Torrent) AllowDataDownload() { defer t.cl.unlock() t.dataDownloadDisallowed.Clear() t.iterPeers(func(p *Peer) { - p.updateRequests("allow data download") + p.onNeedUpdateRequests("allow data download") }) } @@ -2903,7 +2903,7 @@ func (t *Torrent) AllowDataUpload() { defer t.cl.unlock() t.dataUploadDisallowed = false t.iterPeers(func(p *Peer) { - p.updateRequests("allow data upload") + p.onNeedUpdateRequests("allow data upload") }) } @@ -2914,7 +2914,7 @@ func (t *Torrent) DisallowDataUpload() { t.dataUploadDisallowed = true for c := range t.conns { // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead? - c.updateRequests("disallow data upload") + c.onNeedUpdateRequests("disallow data upload") } } @@ -3022,7 +3022,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { ws.onGotInfo(t.info) } t.webSeeds[url] = &ws.peer - ws.peer.updateRequests("Torrent.addWebSeed") + ws.peer.onNeedUpdateRequests("Torrent.addWebSeed") return true } diff --git a/webseed-peer.go b/webseed-peer.go index 251d6fe9..19a055a6 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -30,6 +30,11 @@ type webseedPeer struct { lastUnhandledErr time.Time } +func (me *webseedPeer) updateRequests() { + //TODO implement me + panic("implement me") +} + func (me *webseedPeer) lastWriteUploadRate() float64 { // We never upload to webseeds. return 0 @@ -108,7 +113,7 @@ func (ws *webseedPeer) doRequest(r Request, webseedRequest webseed.Request) { // Delete this entry after waiting above on an error, to prevent more requests. delete(ws.activeRequests, r) if err != nil { - ws.peer.updateRequests("webseedPeer request errored") + ws.peer.onNeedUpdateRequests("webseedPeer request errored") } ws.spawnRequests() locker.Unlock() @@ -151,7 +156,7 @@ func (cn *webseedPeer) ban() { cn.peer.close() } -func (ws *webseedPeer) handleUpdateRequests() { +func (ws *webseedPeer) handleOnNeedUpdateRequests() { // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized // pieces. go func() { @@ -167,7 +172,7 @@ func (ws *webseedPeer) onClose() { ws.peer.cancelAllRequests() ws.peer.t.iterPeers(func(p *Peer) { if p.isLowOnRequests() { - p.updateRequests("webseedPeer.onClose") + p.onNeedUpdateRequests("webseedPeer.onClose") } }) } -- 2.51.0