From e116127559ed30254c0fc1b6f143aa8003c2347f Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 21 May 2025 12:41:54 +1000 Subject: [PATCH] Track num webseed requests, pause on webseed read errors --- client.go | 10 +++++++--- webseed-peer.go | 51 ++++++++++++++++++++++++++++++------------------- 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/client.go b/client.go index 6fb57aee..ef830627 100644 --- a/client.go +++ b/client.go @@ -94,7 +94,8 @@ type Client struct { acceptLimiter map[ipStr]int numHalfOpen int - websocketTrackers websocketTrackers + websocketTrackers websocketTrackers + numWebSeedRequests int activeAnnounceLimiter limiter.Instance httpClient *http.Client @@ -1911,11 +1912,14 @@ func (cl *Client) Stats() ClientStats { } func (cl *Client) underWebSeedHttpRequestLimit() bool { - num := 0 + return cl.numWebSeedRequests < 10 +} + +func (cl *Client) countWebSeedHttpRequests() (num int) { for t := range cl.torrents { for _, p := range t.webSeeds { num += p.numRequests() } } - return num < 10 + return } diff --git a/webseed-peer.go b/webseed-peer.go index caa5cc5b..d20b1945 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -7,12 +7,14 @@ import ( "io" "iter" "log/slog" + "math/rand" "slices" "sync" "time" "github.com/RoaringBitmap/roaring" + g "github.com/anacrolix/generics" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/webseed" @@ -43,7 +45,7 @@ func (me *webseedPeer) numRequests() int { } func (me *webseedPeer) shouldUpdateRequests() bool { - return me.peer.t.cl.underWebSeedHttpRequestLimit() + return me.numRequests() < me.client.MaxRequests && me.peer.t.cl.underWebSeedHttpRequestLimit() } func (me *webseedPeer) updateRequests() { @@ -99,11 +101,10 @@ func (ws *webseedPeer) handleCancel(r RequestIndex) { func (ws *webseedPeer) activeRequestsForIndex(r RequestIndex) iter.Seq[*webseedRequest] { return func(yield func(*webseedRequest) bool) { for wr := range ws.activeRequests { - if r < wr.next || r >= wr.end { - continue - } - if !yield(wr) { - return + if r >= wr.next && r < wr.end { + if !yield(wr) { + return + } } } } @@ -133,6 +134,7 @@ func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) { end: end, } ws.activeRequests[&wsReq] = struct{}{} + ws.peer.t.cl.numWebSeedRequests++ ws.peer.logger.Slogger().Debug( "starting webseed request", "begin", begin, @@ -147,41 +149,50 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) { err := ws.readChunks(webseedRequest) // Ensure the body reader and response are closed. webseedRequest.request.Cancel() - locker.Lock() if err != nil { level := slog.LevelWarn if errors.Is(err, context.Canceled) { level = slog.LevelDebug - } else { - panic(err) } ws.slogger().Log(context.TODO(), level, "webseed request error", "err", err) - // // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any - // // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around - // // it doesn't hurt to slow a few down if there are issues. - // select { - // case <-ws.peer.closed.Done(): - // case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))): - // } + // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any + // kind of error. Pausing here will starve the available requester slots which slows things + // down. + select { + case <-ws.peer.closed.Done(): + case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))): + } } - //locker.Lock() + ws.slogger().Debug("webseed request ended") + locker.Lock() // Delete this entry after waiting above on an error, to prevent more requests. - delete(ws.activeRequests, webseedRequest) + ws.deleteActiveRequest(webseedRequest) if err != nil { ws.peer.onNeedUpdateRequests("webseedPeer request errored") } - ws.peer.t.cl.updateWebSeedRequests() + ws.peer.t.cl.updateWebSeedRequests("webseedPeer request completed") locker.Unlock() } +func (ws *webseedPeer) deleteActiveRequest(wr *webseedRequest) { + g.MustDelete(ws.activeRequests, wr) + ws.peer.t.cl.numWebSeedRequests-- +} + func (ws *webseedPeer) spawnRequests() { next, stop := iter.Pull(ws.inactiveRequests()) defer stop() - for len(ws.activeRequests) < ws.client.MaxRequests && ws.peer.t.cl.underWebSeedHttpRequestLimit() { + for { + if len(ws.activeRequests) >= ws.client.MaxRequests { + break + } req, ok := next() if !ok { break } + if !ws.peer.t.cl.underWebSeedHttpRequestLimit() { + break + } end := seqLast(ws.iterConsecutiveInactiveRequests(req)).Unwrap() ws.spawnRequest(req, end+1) } -- 2.51.0