]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Track num webseed requests, pause on webseed read errors
authorMatt Joiner <anacrolix@gmail.com>
Wed, 21 May 2025 02:41:54 +0000 (12:41 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 21 May 2025 02:41:54 +0000 (12:41 +1000)
client.go
webseed-peer.go

index 6fb57aeeddc7e0d5f752e82f378c938fe238f6ec..ef8306276b39b070d6e1fa3ad141ef985fc60659 100644 (file)
--- a/client.go
+++ b/client.go
@@ -94,7 +94,8 @@ type Client struct {
        acceptLimiter map[ipStr]int
        numHalfOpen   int
 
-       websocketTrackers websocketTrackers
+       websocketTrackers  websocketTrackers
+       numWebSeedRequests int
 
        activeAnnounceLimiter limiter.Instance
        httpClient            *http.Client
@@ -1911,11 +1912,14 @@ func (cl *Client) Stats() ClientStats {
 }
 
 func (cl *Client) underWebSeedHttpRequestLimit() bool {
-       num := 0
+       return cl.numWebSeedRequests < 10
+}
+
+func (cl *Client) countWebSeedHttpRequests() (num int) {
        for t := range cl.torrents {
                for _, p := range t.webSeeds {
                        num += p.numRequests()
                }
        }
-       return num < 10
+       return
 }
index caa5cc5b0f539e7f4d403170b6d9bce66697d491..d20b1945044cf734980e43cb117bd2a5621e3fa9 100644 (file)
@@ -7,12 +7,14 @@ import (
        "io"
        "iter"
        "log/slog"
+       "math/rand"
        "slices"
        "sync"
        "time"
 
        "github.com/RoaringBitmap/roaring"
 
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/webseed"
@@ -43,7 +45,7 @@ func (me *webseedPeer) numRequests() int {
 }
 
 func (me *webseedPeer) shouldUpdateRequests() bool {
-       return me.peer.t.cl.underWebSeedHttpRequestLimit()
+       return me.numRequests() < me.client.MaxRequests && me.peer.t.cl.underWebSeedHttpRequestLimit()
 }
 
 func (me *webseedPeer) updateRequests() {
@@ -99,11 +101,10 @@ func (ws *webseedPeer) handleCancel(r RequestIndex) {
 func (ws *webseedPeer) activeRequestsForIndex(r RequestIndex) iter.Seq[*webseedRequest] {
        return func(yield func(*webseedRequest) bool) {
                for wr := range ws.activeRequests {
-                       if r < wr.next || r >= wr.end {
-                               continue
-                       }
-                       if !yield(wr) {
-                               return
+                       if r >= wr.next && r < wr.end {
+                               if !yield(wr) {
+                                       return
+                               }
                        }
                }
        }
@@ -133,6 +134,7 @@ func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) {
                end:     end,
        }
        ws.activeRequests[&wsReq] = struct{}{}
+       ws.peer.t.cl.numWebSeedRequests++
        ws.peer.logger.Slogger().Debug(
                "starting webseed request",
                "begin", begin,
@@ -147,41 +149,50 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) {
        err := ws.readChunks(webseedRequest)
        // Ensure the body reader and response are closed.
        webseedRequest.request.Cancel()
-       locker.Lock()
        if err != nil {
                level := slog.LevelWarn
                if errors.Is(err, context.Canceled) {
                        level = slog.LevelDebug
-               } else {
-                       panic(err)
                }
                ws.slogger().Log(context.TODO(), level, "webseed request error", "err", err)
-               //      // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
-               //      // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around
-               //      // it doesn't hurt to slow a few down if there are issues.
-               //      select {
-               //      case <-ws.peer.closed.Done():
-               //      case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
-               //      }
+               // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
+               // kind of error. Pausing here will starve the available requester slots which slows things
+               // down.
+               select {
+               case <-ws.peer.closed.Done():
+               case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
+               }
        }
-       //locker.Lock()
+       ws.slogger().Debug("webseed request ended")
+       locker.Lock()
        // Delete this entry after waiting above on an error, to prevent more requests.
-       delete(ws.activeRequests, webseedRequest)
+       ws.deleteActiveRequest(webseedRequest)
        if err != nil {
                ws.peer.onNeedUpdateRequests("webseedPeer request errored")
        }
-       ws.peer.t.cl.updateWebSeedRequests()
+       ws.peer.t.cl.updateWebSeedRequests("webseedPeer request completed")
        locker.Unlock()
 }
 
+func (ws *webseedPeer) deleteActiveRequest(wr *webseedRequest) {
+       g.MustDelete(ws.activeRequests, wr)
+       ws.peer.t.cl.numWebSeedRequests--
+}
+
 func (ws *webseedPeer) spawnRequests() {
        next, stop := iter.Pull(ws.inactiveRequests())
        defer stop()
-       for len(ws.activeRequests) < ws.client.MaxRequests && ws.peer.t.cl.underWebSeedHttpRequestLimit() {
+       for {
+               if len(ws.activeRequests) >= ws.client.MaxRequests {
+                       break
+               }
                req, ok := next()
                if !ok {
                        break
                }
+               if !ws.peer.t.cl.underWebSeedHttpRequestLimit() {
+                       break
+               }
                end := seqLast(ws.iterConsecutiveInactiveRequests(req)).Unwrap()
                ws.spawnRequest(req, end+1)
        }