]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Should fix download rate for webseed peers
authorMatt Joiner <anacrolix@gmail.com>
Mon, 30 Jun 2025 05:51:03 +0000 (15:51 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 30 Jun 2025 05:51:15 +0000 (15:51 +1000)
peer-impl.go
peer.go
peerconn.go
requesting.go
torrent.go
webseed-peer.go

index 76542352724fc987249a46a60cb02ec44739f0e7..211097e84d9d41dda5a2b8a9d7f78f15ae8735b0 100644 (file)
@@ -18,10 +18,8 @@ type legacyPeerImpl interface {
        // Actually go ahead and modify the pending requests.
        updateRequests()
 
-       // handleCancel initiates cancellation of a request and returns acked if it expects the cancel
-       // to be handled by a follow-up event.
+       // handleCancel initiates cancellation of a request
        handleCancel(RequestIndex)
-       acksCancels() bool
        // The final piece to actually commit to a request. Typically, this sends or begins handling the
        // request.
        _request(Request) bool
@@ -46,7 +44,6 @@ type legacyPeerImpl interface {
 // Abstract methods implemented by subclasses of Peer.
 type newHotPeerImpl interface {
        lastWriteUploadRate() float64
-       // How many requests should be assigned to the peer.
-       nominalMaxRequests() maxRequests
        checkReceivedChunk(ri RequestIndex) error
+       expectingChunks() bool
 }
diff --git a/peer.go b/peer.go
index 370c3d32fa2295f604f9baa05e7693f6afdd3794..4ea6eb8789c0e4d40949604db801d3cf412a984d 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -26,6 +26,7 @@ import (
 )
 
 type (
+       // Generic Peer-like fields. Could be WebSeed, BitTorrent over TCP, uTP or WebRTC.
        Peer struct {
                // First to ensure 64-bit alignment for atomics. See #262.
                _stats ConnStats
@@ -150,7 +151,7 @@ func (p *Peer) initRequestState() {
 }
 
 func (cn *Peer) updateExpectingChunks() {
-       if cn.expectingChunks() {
+       if cn.peerImpl.expectingChunks() {
                if cn.lastStartedExpectingToReceiveChunks.IsZero() {
                        cn.lastStartedExpectingToReceiveChunks = time.Now()
                }
@@ -162,7 +163,7 @@ func (cn *Peer) updateExpectingChunks() {
        }
 }
 
-func (cn *Peer) expectingChunks() bool {
+func (cn *PeerConn) expectingChunks() bool {
        if cn.requestState.Requests.IsEmpty() {
                return false
        }
@@ -306,19 +307,12 @@ func (cn *Peer) writeStatus(w io.Writer) {
                cn.totalExpectingTime(),
        )
        fmt.Fprintf(w,
-               "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+               "%s completed, %d pieces touched, good chunks: %v/%v:%v dr: %.1f KiB/s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
                &cn._stats.ChunksReadUseful,
                &cn._stats.ChunksRead,
                &cn._stats.ChunksWritten,
-               cn.requestState.Requests.GetCardinality(),
-               cn.requestState.Cancelled.GetCardinality(),
-               cn.peerImpl.nominalMaxRequests(),
-               cn.PeerMaxRequests,
-               len(cn.peerRequests),
-               localClientReqq,
-               cn.statusFlags(),
                cn.downloadRate()/(1<<10),
        )
        fmt.Fprintf(w, "requested pieces:")
@@ -460,7 +454,7 @@ func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
        if cn.requestState.Requests.Contains(r) {
                return true, nil
        }
-       if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.peerImpl.nominalMaxRequests() {
+       if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
                return true, errors.New("too many outstanding requests")
        }
        cn.requestState.Requests.Add(r)
@@ -485,12 +479,6 @@ func (me *Peer) cancel(r RequestIndex) {
                panic("request not existing should have been guarded")
        }
        me.handleCancel(r)
-       if me.acksCancels() {
-               // Record that we expect to get a cancel ack.
-               if !me.requestState.Cancelled.CheckedAdd(r) {
-                       panic("request already cancelled")
-               }
-       }
        me.decPeakRequests()
        if me.isLowOnRequests() {
                me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
@@ -676,7 +664,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                return nil
        }
 
-       piece := &t.pieces[ppReq.Index]
+       piece := t.piece(ppReq.Index.Int())
 
        c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
index 340196f14cdf139b66a89bdf47b9f4c223c768f4..4386b22f2fef6d1dbc00dc4c71409f6cffb5af4b 100644 (file)
@@ -137,6 +137,16 @@ func (cn *PeerConn) peerImplStatusLines() []string {
                fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes),
                fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs),
                fmt.Sprintf("pex: %s", cn.pexStatus()),
+               fmt.Sprintf(
+                       "reqq: %d+%v/(%d/%d):%d/%d, flags: %s",
+                       cn.requestState.Requests.GetCardinality(),
+                       cn.requestState.Cancelled.GetCardinality(),
+                       cn.nominalMaxRequests(),
+                       cn.PeerMaxRequests,
+                       len(cn.peerRequests),
+                       localClientReqq,
+                       cn.statusFlags(),
+               ),
        }
 }
 
@@ -336,10 +346,12 @@ func (me *PeerConn) _request(r Request) bool {
 
 func (me *PeerConn) handleCancel(r RequestIndex) {
        me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
-}
-
-func (me *PeerConn) acksCancels() bool {
-       return me.remoteRejectsCancels()
+       if me.remoteRejectsCancels() {
+               // Record that we expect to get a cancel ack.
+               if !me.requestState.Cancelled.CheckedAdd(r) {
+                       panic("request already cancelled")
+               }
+       }
 }
 
 // Whether we should expect a reject message after sending a cancel.
index a8c6fe3cecde958dbfcbb833fddd3d6aee1efe86..33232f14983e90d1eb6b12651875ac8beb9f97e8 100644 (file)
@@ -342,7 +342,7 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) {
                        break
                }
                numPending := maxRequests(current.Requests.GetCardinality() + current.Cancelled.GetCardinality())
-               if numPending >= p.peerImpl.nominalMaxRequests() {
+               if numPending >= p.nominalMaxRequests() {
                        break
                }
                req := heap.Pop(requestHeap)
index 56ccf78a2d474e0a304a81f89f4415ecc07fc59b..37f304dfcf8fb8040e39d42d10f03a6dc9015f4a 100644 (file)
@@ -1328,16 +1328,13 @@ func (t *Torrent) maybeDropMutuallyCompletePeer(
 }
 
 func (t *Torrent) haveChunk(r Request) (ret bool) {
-       // defer func() {
-       //      log.Println("have chunk", r, ret)
-       // }()
        if !t.haveInfo() {
                return false
        }
        if t.pieceComplete(pieceIndex(r.Index)) {
                return true
        }
-       p := &t.pieces[r.Index]
+       p := t.piece(int(r.Index))
        return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
 }
 
@@ -2998,6 +2995,8 @@ func (t *Torrent) callbacks() *Callbacks {
 
 type AddWebSeedsOpt func(*webseed.Client)
 
+// TODO: Add a webseed http.Client option.
+
 // Max concurrent requests to a WebSeed for a given torrent.
 func WebSeedTorrentMaxRequests(maxRequests int) AddWebSeedsOpt {
        return func(c *webseed.Client) {
index f40e9dbf4941742cd4a15012cc17a12dbeb519d3..0f5a29447e5eb836693c67bde67d3639fef7fee3 100644 (file)
@@ -29,18 +29,12 @@ type webseedPeer struct {
        hostKey          webseedHostKeyHandle
 }
 
-func (me *webseedPeer) checkReceivedChunk(ri RequestIndex) error {
-       return nil
-}
-
-func (me *webseedPeer) nominalMaxRequests() maxRequests {
-       // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
-       // we just allow 2 MiB worth of requests.
-       return me.peer.PeerMaxRequests
+func (me *webseedPeer) expectingChunks() bool {
+       return len(me.activeRequests) > 0
 }
 
-func (me *webseedPeer) acksCancels() bool {
-       return false
+func (me *webseedPeer) checkReceivedChunk(ri RequestIndex) error {
+       return nil
 }
 
 func (me *webseedPeer) numRequests() int {
@@ -134,6 +128,7 @@ func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) {
                end:     end,
        }
        ws.activeRequests[&wsReq] = struct{}{}
+       ws.peer.updateExpectingChunks()
        panicif.Zero(ws.hostKey)
        ws.peer.t.cl.numWebSeedRequests[ws.hostKey]++
        ws.slogger().Debug(
@@ -159,7 +154,8 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) {
                torrent.Add("webseed request error count", 1)
                // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
                // kind of error. Pausing here will starve the available requester slots which slows things
-               // down.
+               // down. TODO: I don't think this will help anymore. Need to register a reduced concurrency
+               // available for a host/cost key.
                select {
                case <-ws.peer.closed.Done():
                case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
@@ -179,6 +175,7 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) {
 func (ws *webseedPeer) deleteActiveRequest(wr *webseedRequest) {
        g.MustDelete(ws.activeRequests, wr)
        ws.peer.t.cl.numWebSeedRequests[ws.hostKey]--
+       ws.peer.updateExpectingChunks()
 }
 
 func (ws *webseedPeer) spawnRequests() {