]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Keep webseed requests below client limit and update synchronously
authorMatt Joiner <anacrolix@gmail.com>
Mon, 19 May 2025 04:52:25 +0000 (14:52 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 19 May 2025 04:52:25 +0000 (14:52 +1000)
client.go
requesting.go
t.go
torrent.go
webseed-peer.go
webseed-requesting.go

index ce9ca2e4611dee09d6b1661b36bfa3b1bb3a6705..68eb9fd2ac0768426c874fdb82868cd1f5cfd384 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1370,9 +1370,9 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
                metadataChanged: sync.Cond{
                        L: cl.locker(),
                },
-               webSeeds:     make(map[string]*Peer),
                gotMetainfoC: make(chan struct{}),
        }
+       g.MakeMap(&t.webSeeds)
        t.closedCtx, t.closedCtxCancel = context.WithCancel(context.Background())
        var salt [8]byte
        rand.Read(salt[:])
@@ -1895,3 +1895,13 @@ func (cl *Client) Stats() ClientStats {
        defer cl.rUnlock()
        return cl.statsLocked()
 }
+
+func (cl *Client) underWebSeedHttpRequestLimit() bool {
+       num := 0
+       for t := range cl.torrents {
+               for _, p := range t.webSeeds {
+                       num += p.numRequests()
+               }
+       }
+       return num < 10
+}
index 8680665de0c7f626f6204dd70af1e66d593997a9..591b407ae640b870c02f93040d1c8594b8bd0b62 100644 (file)
@@ -305,7 +305,8 @@ func (p *Peer) allowSendNotInterested() bool {
 }
 
 // Transmit/action the request state to the peer. This includes work-stealing from other peers and
-// some piece order randomization within the preferred state calculated earlier in next.
+// some piece order randomization within the preferred state calculated earlier in next. Cancels are
+// not done here, those are handled synchronously. We only track pending cancel acknowledgements.
 func (p *Peer) applyRequestState(next desiredRequestState) {
        current := &p.requestState
        // Make interest sticky
diff --git a/t.go b/t.go
index 8a07579f403823de3c59eb99f5f0ed1d70575af0..cbad1959f9ccc255e37b0a85106a5f2f2ea65fa8 100644 (file)
--- a/t.go
+++ b/t.go
@@ -295,7 +295,7 @@ func (t *Torrent) WebseedPeerConns() []*Peer {
        defer t.cl.rUnlock()
        ret := make([]*Peer, 0, len(t.conns))
        for _, c := range t.webSeeds {
-               ret = append(ret, c)
+               ret = append(ret, &c.peer)
        }
        return ret
 }
index da5b69554d2dd2de0084aa3f97439cfc55fa2a06..e470fd881a8117003327360bf153ccd0fd54f73b 100644 (file)
@@ -113,7 +113,7 @@ type Torrent struct {
 
        _chunksPerRegularPiece chunkIndexType
 
-       webSeeds map[string]*Peer
+       webSeeds map[string]*webseedPeer
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
        conns               map[*PeerConn]struct{}
@@ -903,7 +903,14 @@ func (t *Torrent) writeStatus(w io.Writer) {
        dumpStats(w, t.statsLocked())
 
        fmt.Fprintf(w, "webseeds:\n")
-       t.writePeerStatuses(w, maps.Values(t.webSeeds))
+
+       t.writePeerStatuses(w, func(yield func(*Peer) bool) {
+               for _, ws := range t.webSeeds {
+                       if !yield(&ws.peer) {
+                               return
+                       }
+               }
+       })
 
        // Peers without priorities first, then those with. I'm undecided about how to order peers
        // without priorities.
@@ -2920,7 +2927,7 @@ func (t *Torrent) iterPeers(f func(p *Peer)) {
                f(&pc.Peer)
        }
        for _, ws := range t.webSeeds {
-               f(ws)
+               f(&ws.peer)
        }
 }
 
@@ -2961,11 +2968,12 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool {
                return false
        }
        // I don't think Go http supports pipelining requests. However, we can have more ready to go
-       // right away. This value should be some multiple of the number of connections to a host. I
-       // would expect that double maxRequests plus a bit would be appropriate. This value is based on
-       // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
-       // "https://webtorrent.io/torrents/".
-       const defaultMaxRequests = 16
+       // right away. This value should be some multiple of the number of connections to a host. This
+       // number is based on keeping at least one connection actively downloading while another request
+       // is fired off, and ensuring race detection works. Downloading Sintel
+       // (08ada5a7a6183aae1e09d831df6748d566095a10) from "https://webtorrent.io/torrents/" is a good
+       // test.
+       const defaultMaxRequests = 2
        ws := webseedPeer{
                peer: Peer{
                        t:                        t,
@@ -2993,11 +3001,9 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool {
                opt(&ws.client)
        }
        g.MakeMapWithCap(&ws.activeRequests, ws.client.MaxRequests)
-       // This should affect how often we have to recompute requests for this peer. Note that
-       // because we can request more than 1 thing at a time over HTTP, we will hit the low
-       // requests mark more often, so recomputation is probably sooner than with regular peer
-       // conns. ~4x maxRequests would be about right.
-       ws.peer.PeerMaxRequests = 4 * ws.client.MaxRequests
+       // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
+       // we just allow 2 MiB worth of requests. See newHotPeerImpl.nominalMaxRequests.
+       ws.peer.PeerMaxRequests = intCeilDiv(2<<20, ws.peer.t.chunkSize.Int())
        ws.peer.initUpdateRequestsTimer()
        ws.locker = t.cl.locker()
        for _, f := range t.callbacks().NewPeer {
@@ -3010,7 +3016,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool {
        if t.haveInfo() {
                ws.onGotInfo(t.info)
        }
-       t.webSeeds[url] = &ws.peer
+       t.webSeeds[url] = &ws
        ws.peer.onNeedUpdateRequests("Torrent.addWebSeed")
        return true
 }
index fed0e8bd2ffafac0b91241d03105eee6c782229b..caa5cc5b0f539e7f4d403170b6d9bce66697d491 100644 (file)
@@ -12,7 +12,6 @@ import (
        "time"
 
        "github.com/RoaringBitmap/roaring"
-       g "github.com/anacrolix/generics"
 
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -31,14 +30,26 @@ type webseedPeer struct {
 func (me *webseedPeer) nominalMaxRequests() maxRequests {
        // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
        // we just allow 2 MiB worth of requests.
-       return intCeilDiv(2<<20, me.peer.t.chunkSize.Int())
+       return me.peer.PeerMaxRequests
 }
 
 func (me *webseedPeer) acksCancels() bool {
        return false
 }
 
+func (me *webseedPeer) numRequests() int {
+       // What about unassigned requests? TODO: Don't allow those.
+       return len(me.activeRequests)
+}
+
+func (me *webseedPeer) shouldUpdateRequests() bool {
+       return me.peer.t.cl.underWebSeedHttpRequestLimit()
+}
+
 func (me *webseedPeer) updateRequests() {
+       if !me.shouldUpdateRequests() {
+               return
+       }
        p := &me.peer
        next := p.getDesiredRequestState()
        p.applyRequestState(next)
@@ -159,14 +170,14 @@ func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) {
        if err != nil {
                ws.peer.onNeedUpdateRequests("webseedPeer request errored")
        }
-       ws.spawnRequests()
+       ws.peer.t.cl.updateWebSeedRequests()
        locker.Unlock()
 }
 
 func (ws *webseedPeer) spawnRequests() {
        next, stop := iter.Pull(ws.inactiveRequests())
        defer stop()
-       for len(ws.activeRequests) <= ws.client.MaxRequests {
+       for len(ws.activeRequests) < ws.client.MaxRequests && ws.peer.t.cl.underWebSeedHttpRequestLimit() {
                req, ok := next()
                if !ok {
                        break
@@ -239,13 +250,7 @@ func (cn *webseedPeer) ban() {
 }
 
 func (ws *webseedPeer) handleOnNeedUpdateRequests() {
-       // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
-       // pieces.
-       go func() {
-               ws.peer.t.cl.lock()
-               defer ws.peer.t.cl.unlock()
-               ws.peer.maybeUpdateActualRequestState()
-       }()
+       ws.peer.maybeUpdateActualRequestState()
 }
 
 func (ws *webseedPeer) onClose() {
index bbcbc9b9cee1264ce7f4ae66d60083c8d5c497bf..383b14372989303b622ad2a4c9fe9740122f2a6c 100644 (file)
@@ -10,8 +10,10 @@ import (
 - For each piece calculate files involved. Record each file not seen before and the piece index.
 - Cancel any outstanding requests that don't match a final file/piece-index pair.
 - Initiate missing requests that fit into the available limits.
+
+This was a globally aware webseed requestor algorithm that is probably going to be abandoned.
 */
-func (cl *Client) updateWebSeedRequests() {
+func (cl *Client) abandonedUpdateWebSeedRequests() {
        for key, value := range cl.pieceRequestOrder {
                input := key.getRequestStrategyInput(cl)
                requestStrategy.GetRequestablePieces(
@@ -23,3 +25,11 @@ func (cl *Client) updateWebSeedRequests() {
                )
        }
 }
+
+func (cl *Client) updateWebSeedRequests() {
+       for t := range cl.torrents {
+               for _, p := range t.webSeeds {
+                       p.updateRequests()
+               }
+       }
+}