From ea4e804c2716c39a00419d73a220de6bef39956e Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 19 May 2025 14:52:25 +1000 Subject: [PATCH] Keep webseed requests below client limit and update synchronously --- client.go | 12 +++++++++++- requesting.go | 3 ++- t.go | 2 +- torrent.go | 34 ++++++++++++++++++++-------------- webseed-peer.go | 27 ++++++++++++++++----------- webseed-requesting.go | 12 +++++++++++- 6 files changed, 61 insertions(+), 29 deletions(-) diff --git a/client.go b/client.go index ce9ca2e4..68eb9fd2 100644 --- a/client.go +++ b/client.go @@ -1370,9 +1370,9 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { metadataChanged: sync.Cond{ L: cl.locker(), }, - webSeeds: make(map[string]*Peer), gotMetainfoC: make(chan struct{}), } + g.MakeMap(&t.webSeeds) t.closedCtx, t.closedCtxCancel = context.WithCancel(context.Background()) var salt [8]byte rand.Read(salt[:]) @@ -1895,3 +1895,13 @@ func (cl *Client) Stats() ClientStats { defer cl.rUnlock() return cl.statsLocked() } + +func (cl *Client) underWebSeedHttpRequestLimit() bool { + num := 0 + for t := range cl.torrents { + for _, p := range t.webSeeds { + num += p.numRequests() + } + } + return num < 10 +} diff --git a/requesting.go b/requesting.go index 8680665d..591b407a 100644 --- a/requesting.go +++ b/requesting.go @@ -305,7 +305,8 @@ func (p *Peer) allowSendNotInterested() bool { } // Transmit/action the request state to the peer. This includes work-stealing from other peers and -// some piece order randomization within the preferred state calculated earlier in next. +// some piece order randomization within the preferred state calculated earlier in next. Cancels are +// not done here, those are handled synchronously. We only track pending cancel acknowledgements. func (p *Peer) applyRequestState(next desiredRequestState) { current := &p.requestState // Make interest sticky diff --git a/t.go b/t.go index 8a07579f..cbad1959 100644 --- a/t.go +++ b/t.go @@ -295,7 +295,7 @@ func (t *Torrent) WebseedPeerConns() []*Peer { defer t.cl.rUnlock() ret := make([]*Peer, 0, len(t.conns)) for _, c := range t.webSeeds { - ret = append(ret, c) + ret = append(ret, &c.peer) } return ret } diff --git a/torrent.go b/torrent.go index da5b6955..e470fd88 100644 --- a/torrent.go +++ b/torrent.go @@ -113,7 +113,7 @@ type Torrent struct { _chunksPerRegularPiece chunkIndexType - webSeeds map[string]*Peer + webSeeds map[string]*webseedPeer // Active peer connections, running message stream loops. TODO: Make this // open (not-closed) connections only. conns map[*PeerConn]struct{} @@ -903,7 +903,14 @@ func (t *Torrent) writeStatus(w io.Writer) { dumpStats(w, t.statsLocked()) fmt.Fprintf(w, "webseeds:\n") - t.writePeerStatuses(w, maps.Values(t.webSeeds)) + + t.writePeerStatuses(w, func(yield func(*Peer) bool) { + for _, ws := range t.webSeeds { + if !yield(&ws.peer) { + return + } + } + }) // Peers without priorities first, then those with. I'm undecided about how to order peers // without priorities. @@ -2920,7 +2927,7 @@ func (t *Torrent) iterPeers(f func(p *Peer)) { f(&pc.Peer) } for _, ws := range t.webSeeds { - f(ws) + f(&ws.peer) } } @@ -2961,11 +2968,12 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { return false } // I don't think Go http supports pipelining requests. However, we can have more ready to go - // right away. This value should be some multiple of the number of connections to a host. I - // would expect that double maxRequests plus a bit would be appropriate. This value is based on - // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from - // "https://webtorrent.io/torrents/". - const defaultMaxRequests = 16 + // right away. This value should be some multiple of the number of connections to a host. This + // number is based on keeping at least one connection actively downloading while another request + // is fired off, and ensuring race detection works. Downloading Sintel + // (08ada5a7a6183aae1e09d831df6748d566095a10) from "https://webtorrent.io/torrents/" is a good + // test. + const defaultMaxRequests = 2 ws := webseedPeer{ peer: Peer{ t: t, @@ -2993,11 +3001,9 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { opt(&ws.client) } g.MakeMapWithCap(&ws.activeRequests, ws.client.MaxRequests) - // This should affect how often we have to recompute requests for this peer. Note that - // because we can request more than 1 thing at a time over HTTP, we will hit the low - // requests mark more often, so recomputation is probably sooner than with regular peer - // conns. ~4x maxRequests would be about right. - ws.peer.PeerMaxRequests = 4 * ws.client.MaxRequests + // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now + // we just allow 2 MiB worth of requests. See newHotPeerImpl.nominalMaxRequests. + ws.peer.PeerMaxRequests = intCeilDiv(2<<20, ws.peer.t.chunkSize.Int()) ws.peer.initUpdateRequestsTimer() ws.locker = t.cl.locker() for _, f := range t.callbacks().NewPeer { @@ -3010,7 +3016,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { if t.haveInfo() { ws.onGotInfo(t.info) } - t.webSeeds[url] = &ws.peer + t.webSeeds[url] = &ws ws.peer.onNeedUpdateRequests("Torrent.addWebSeed") return true } diff --git a/webseed-peer.go b/webseed-peer.go index fed0e8bd..caa5cc5b 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -12,7 +12,6 @@ import ( "time" "github.com/RoaringBitmap/roaring" - g "github.com/anacrolix/generics" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" @@ -31,14 +30,26 @@ type webseedPeer struct { func (me *webseedPeer) nominalMaxRequests() maxRequests { // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now // we just allow 2 MiB worth of requests. - return intCeilDiv(2<<20, me.peer.t.chunkSize.Int()) + return me.peer.PeerMaxRequests } func (me *webseedPeer) acksCancels() bool { return false } +func (me *webseedPeer) numRequests() int { + // What about unassigned requests? TODO: Don't allow those. + return len(me.activeRequests) +} + +func (me *webseedPeer) shouldUpdateRequests() bool { + return me.peer.t.cl.underWebSeedHttpRequestLimit() +} + func (me *webseedPeer) updateRequests() { + if !me.shouldUpdateRequests() { + return + } p := &me.peer next := p.getDesiredRequestState() p.applyRequestState(next) @@ -159,14 +170,14 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) { if err != nil { ws.peer.onNeedUpdateRequests("webseedPeer request errored") } - ws.spawnRequests() + ws.peer.t.cl.updateWebSeedRequests() locker.Unlock() } func (ws *webseedPeer) spawnRequests() { next, stop := iter.Pull(ws.inactiveRequests()) defer stop() - for len(ws.activeRequests) <= ws.client.MaxRequests { + for len(ws.activeRequests) < ws.client.MaxRequests && ws.peer.t.cl.underWebSeedHttpRequestLimit() { req, ok := next() if !ok { break @@ -239,13 +250,7 @@ func (cn *webseedPeer) ban() { } func (ws *webseedPeer) handleOnNeedUpdateRequests() { - // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized - // pieces. - go func() { - ws.peer.t.cl.lock() - defer ws.peer.t.cl.unlock() - ws.peer.maybeUpdateActualRequestState() - }() + ws.peer.maybeUpdateActualRequestState() } func (ws *webseedPeer) onClose() { diff --git a/webseed-requesting.go b/webseed-requesting.go index bbcbc9b9..383b1437 100644 --- a/webseed-requesting.go +++ b/webseed-requesting.go @@ -10,8 +10,10 @@ import ( - For each piece calculate files involved. Record each file not seen before and the piece index. - Cancel any outstanding requests that don't match a final file/piece-index pair. - Initiate missing requests that fit into the available limits. + +This was a globally aware webseed requestor algorithm that is probably going to be abandoned. */ -func (cl *Client) updateWebSeedRequests() { +func (cl *Client) abandonedUpdateWebSeedRequests() { for key, value := range cl.pieceRequestOrder { input := key.getRequestStrategyInput(cl) requestStrategy.GetRequestablePieces( @@ -23,3 +25,11 @@ func (cl *Client) updateWebSeedRequests() { ) } } + +func (cl *Client) updateWebSeedRequests() { + for t := range cl.torrents { + for _, p := range t.webSeeds { + p.updateRequests() + } + } +} -- 2.51.0