From 44c0fc1e6a75c8204b04ebb4d04a7a3404e2c06e Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 30 Jun 2025 15:51:03 +1000 Subject: [PATCH] Should fix download rate for webseed peers --- peer-impl.go | 7 ++----- peer.go | 24 ++++++------------------ peerconn.go | 20 ++++++++++++++++---- requesting.go | 2 +- torrent.go | 7 +++---- webseed-peer.go | 19 ++++++++----------- 6 files changed, 36 insertions(+), 43 deletions(-) diff --git a/peer-impl.go b/peer-impl.go index 76542352..211097e8 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -18,10 +18,8 @@ type legacyPeerImpl interface { // Actually go ahead and modify the pending requests. updateRequests() - // handleCancel initiates cancellation of a request and returns acked if it expects the cancel - // to be handled by a follow-up event. + // handleCancel initiates cancellation of a request handleCancel(RequestIndex) - acksCancels() bool // The final piece to actually commit to a request. Typically, this sends or begins handling the // request. _request(Request) bool @@ -46,7 +44,6 @@ type legacyPeerImpl interface { // Abstract methods implemented by subclasses of Peer. type newHotPeerImpl interface { lastWriteUploadRate() float64 - // How many requests should be assigned to the peer. - nominalMaxRequests() maxRequests checkReceivedChunk(ri RequestIndex) error + expectingChunks() bool } diff --git a/peer.go b/peer.go index 370c3d32..4ea6eb87 100644 --- a/peer.go +++ b/peer.go @@ -26,6 +26,7 @@ import ( ) type ( + // Generic Peer-like fields. Could be WebSeed, BitTorrent over TCP, uTP or WebRTC. Peer struct { // First to ensure 64-bit alignment for atomics. See #262. _stats ConnStats @@ -150,7 +151,7 @@ func (p *Peer) initRequestState() { } func (cn *Peer) updateExpectingChunks() { - if cn.expectingChunks() { + if cn.peerImpl.expectingChunks() { if cn.lastStartedExpectingToReceiveChunks.IsZero() { cn.lastStartedExpectingToReceiveChunks = time.Now() } @@ -162,7 +163,7 @@ func (cn *Peer) updateExpectingChunks() { } } -func (cn *Peer) expectingChunks() bool { +func (cn *PeerConn) expectingChunks() bool { if cn.requestState.Requests.IsEmpty() { return false } @@ -306,19 +307,12 @@ func (cn *Peer) writeStatus(w io.Writer) { cn.totalExpectingTime(), ) fmt.Fprintf(w, - "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", + "%s completed, %d pieces touched, good chunks: %v/%v:%v dr: %.1f KiB/s\n", cn.completedString(), len(cn.peerTouchedPieces), &cn._stats.ChunksReadUseful, &cn._stats.ChunksRead, &cn._stats.ChunksWritten, - cn.requestState.Requests.GetCardinality(), - cn.requestState.Cancelled.GetCardinality(), - cn.peerImpl.nominalMaxRequests(), - cn.PeerMaxRequests, - len(cn.peerRequests), - localClientReqq, - cn.statusFlags(), cn.downloadRate()/(1<<10), ) fmt.Fprintf(w, "requested pieces:") @@ -460,7 +454,7 @@ func (cn *PeerConn) request(r RequestIndex) (more bool, err error) { if cn.requestState.Requests.Contains(r) { return true, nil } - if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.peerImpl.nominalMaxRequests() { + if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() { return true, errors.New("too many outstanding requests") } cn.requestState.Requests.Add(r) @@ -485,12 +479,6 @@ func (me *Peer) cancel(r RequestIndex) { panic("request not existing should have been guarded") } me.handleCancel(r) - if me.acksCancels() { - // Record that we expect to get a cancel ack. - if !me.requestState.Cancelled.CheckedAdd(r) { - panic("request already cancelled") - } - } me.decPeakRequests() if me.isLowOnRequests() { me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason) @@ -676,7 +664,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { return nil } - piece := &t.pieces[ppReq.Index] + piece := t.piece(ppReq.Index.Int()) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) diff --git a/peerconn.go b/peerconn.go index 340196f1..4386b22f 100644 --- a/peerconn.go +++ b/peerconn.go @@ -137,6 +137,16 @@ func (cn *PeerConn) peerImplStatusLines() []string { fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes), fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs), fmt.Sprintf("pex: %s", cn.pexStatus()), + fmt.Sprintf( + "reqq: %d+%v/(%d/%d):%d/%d, flags: %s", + cn.requestState.Requests.GetCardinality(), + cn.requestState.Cancelled.GetCardinality(), + cn.nominalMaxRequests(), + cn.PeerMaxRequests, + len(cn.peerRequests), + localClientReqq, + cn.statusFlags(), + ), } } @@ -336,10 +346,12 @@ func (me *PeerConn) _request(r Request) bool { func (me *PeerConn) handleCancel(r RequestIndex) { me.write(makeCancelMessage(me.t.requestIndexToRequest(r))) -} - -func (me *PeerConn) acksCancels() bool { - return me.remoteRejectsCancels() + if me.remoteRejectsCancels() { + // Record that we expect to get a cancel ack. + if !me.requestState.Cancelled.CheckedAdd(r) { + panic("request already cancelled") + } + } } // Whether we should expect a reject message after sending a cancel. diff --git a/requesting.go b/requesting.go index a8c6fe3c..33232f14 100644 --- a/requesting.go +++ b/requesting.go @@ -342,7 +342,7 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) { break } numPending := maxRequests(current.Requests.GetCardinality() + current.Cancelled.GetCardinality()) - if numPending >= p.peerImpl.nominalMaxRequests() { + if numPending >= p.nominalMaxRequests() { break } req := heap.Pop(requestHeap) diff --git a/torrent.go b/torrent.go index 56ccf78a..37f304df 100644 --- a/torrent.go +++ b/torrent.go @@ -1328,16 +1328,13 @@ func (t *Torrent) maybeDropMutuallyCompletePeer( } func (t *Torrent) haveChunk(r Request) (ret bool) { - // defer func() { - // log.Println("have chunk", r, ret) - // }() if !t.haveInfo() { return false } if t.pieceComplete(pieceIndex(r.Index)) { return true } - p := &t.pieces[r.Index] + p := t.piece(int(r.Index)) return !p.pendingChunk(r.ChunkSpec, t.chunkSize) } @@ -2998,6 +2995,8 @@ func (t *Torrent) callbacks() *Callbacks { type AddWebSeedsOpt func(*webseed.Client) +// TODO: Add a webseed http.Client option. + // Max concurrent requests to a WebSeed for a given torrent. func WebSeedTorrentMaxRequests(maxRequests int) AddWebSeedsOpt { return func(c *webseed.Client) { diff --git a/webseed-peer.go b/webseed-peer.go index f40e9dbf..0f5a2944 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -29,18 +29,12 @@ type webseedPeer struct { hostKey webseedHostKeyHandle } -func (me *webseedPeer) checkReceivedChunk(ri RequestIndex) error { - return nil -} - -func (me *webseedPeer) nominalMaxRequests() maxRequests { - // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now - // we just allow 2 MiB worth of requests. - return me.peer.PeerMaxRequests +func (me *webseedPeer) expectingChunks() bool { + return len(me.activeRequests) > 0 } -func (me *webseedPeer) acksCancels() bool { - return false +func (me *webseedPeer) checkReceivedChunk(ri RequestIndex) error { + return nil } func (me *webseedPeer) numRequests() int { @@ -134,6 +128,7 @@ func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) { end: end, } ws.activeRequests[&wsReq] = struct{}{} + ws.peer.updateExpectingChunks() panicif.Zero(ws.hostKey) ws.peer.t.cl.numWebSeedRequests[ws.hostKey]++ ws.slogger().Debug( @@ -159,7 +154,8 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) { torrent.Add("webseed request error count", 1) // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any // kind of error. Pausing here will starve the available requester slots which slows things - // down. + // down. TODO: I don't think this will help anymore. Need to register a reduced concurrency + // available for a host/cost key. select { case <-ws.peer.closed.Done(): case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))): @@ -179,6 +175,7 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) { func (ws *webseedPeer) deleteActiveRequest(wr *webseedRequest) { g.MustDelete(ws.activeRequests, wr) ws.peer.t.cl.numWebSeedRequests[ws.hostKey]-- + ws.peer.updateExpectingChunks() } func (ws *webseedPeer) spawnRequests() { -- 2.51.0