]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Create webseed requester goroutines as required
authorMatt Joiner <anacrolix@gmail.com>
Tue, 6 May 2025 23:36:33 +0000 (09:36 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 6 May 2025 23:36:33 +0000 (09:36 +1000)
peer-impl.go
torrent.go
webseed-peer.go

index 2a036aea4979b87691887818bdd85f7bfbf9bdd2..391bce681c477b4900ab87c1fb6d861cf238121c 100644 (file)
@@ -19,6 +19,8 @@ type legacyPeerImpl interface {
        // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
        // handled by a follow-up event.
        _cancel(RequestIndex) (acked bool)
+       // The final piece to actually commit to a request. Typically, this sends or begins handling the
+       // request.
        _request(Request) bool
        connectionFlags() string
        onClose()
index 116507fa20fe6f5781df59838824f322e30952d3..93e5b86814bcc74830aa66af3fcfa36216c0a808 100644 (file)
@@ -3007,10 +3007,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool {
        // conns. ~4x maxRequests would be about right.
        ws.peer.PeerMaxRequests = 4 * ws.client.MaxRequests
        ws.peer.initUpdateRequestsTimer()
-       ws.requesterCond.L = t.cl.locker()
-       for i := 0; i < ws.client.MaxRequests; i += 1 {
-               go ws.requester(i)
-       }
+       ws.locker = t.cl.locker()
        for _, f := range t.callbacks().NewPeer {
                f(&ws.peer)
        }
index c6da58fffff33d4ab5c6cc9daa40ff78f94edcad..251d6fe94196bb5569bfa0e9bf7ad0e074a21fc0 100644 (file)
@@ -4,6 +4,7 @@ import (
        "context"
        "errors"
        "fmt"
+       "iter"
        "math/rand"
        "sync"
        "time"
@@ -25,7 +26,7 @@ type webseedPeer struct {
        peer             Peer
        client           webseed.Client
        activeRequests   map[Request]webseed.Request
-       requesterCond    sync.Cond
+       locker           sync.Locker
        lastUnhandledErr time.Time
 }
 
@@ -76,59 +77,66 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
 }
 
 func (ws *webseedPeer) _request(r Request) bool {
-       ws.requesterCond.Signal()
+       ws.spawnRequests()
        return true
 }
 
-// Returns true if we should look for another request to start. Returns false if we handled this
-// one.
-func (ws *webseedPeer) requestIteratorLocked(requesterIndex int, x RequestIndex) bool {
-       r := ws.peer.t.requestIndexToRequest(x)
-       if _, ok := ws.activeRequests[r]; ok {
-               return true
-       }
+func (ws *webseedPeer) spawnReq(r Request) {
        webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r))
        ws.activeRequests[r] = webseedRequest
-       locker := ws.requesterCond.L
-       err := func() error {
-               locker.Unlock()
-               defer locker.Lock()
-               return ws.requestResultHandler(r, webseedRequest)
-       }()
-       delete(ws.activeRequests, r)
+       go ws.doRequest(r, webseedRequest)
+}
+
+func (ws *webseedPeer) doRequest(r Request, webseedRequest webseed.Request) {
+       locker := ws.locker
+       err := ws.requestResultHandler(r, webseedRequest)
        if err != nil {
                level := log.Warning
                if errors.Is(err, context.Canceled) {
                        level = log.Debug
                }
-               ws.peer.logger.Levelf(level, "requester %v: error doing webseed request %v: %v", requesterIndex, r, err)
+               ws.peer.logger.Levelf(level, "error doing webseed request %v: %v", r, err)
                // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
-               // kind of error. There are maxRequests (in Torrent.addWebSeed) requestors bouncing around
+               // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around
                // it doesn't hurt to slow a few down if there are issues.
-               locker.Unlock()
                select {
                case <-ws.peer.closed.Done():
                case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
                }
-               locker.Lock()
+       }
+       locker.Lock()
+       // Delete this entry after waiting above on an error, to prevent more requests.
+       delete(ws.activeRequests, r)
+       if err != nil {
                ws.peer.updateRequests("webseedPeer request errored")
        }
-       return false
+       ws.spawnRequests()
+       locker.Unlock()
+}
 
+func (ws *webseedPeer) spawnRequests() {
+       next, stop := iter.Pull(ws.inactiveRequests())
+       defer stop()
+       for len(ws.activeRequests) <= ws.client.MaxRequests {
+               req, ok := next()
+               if !ok {
+                       break
+               }
+               ws.spawnReq(req)
+       }
 }
 
-func (ws *webseedPeer) requester(i int) {
-       ws.requesterCond.L.Lock()
-       defer ws.requesterCond.L.Unlock()
-start:
-       for !ws.peer.closed.IsSet() {
+func (ws *webseedPeer) inactiveRequests() iter.Seq[Request] {
+       return func(yield func(Request) bool) {
                for reqIndex := range ws.peer.requestState.Requests.Iterator() {
-                       if !ws.requestIteratorLocked(i, reqIndex) {
-                               goto start
+                       r := ws.peer.t.requestIndexToRequest(reqIndex)
+                       _, ok := ws.activeRequests[r]
+                       if !ok {
+                               if !yield(r) {
+                                       return
+                               }
                        }
                }
-               // Found no requests to handle, so wait.
-               ws.requesterCond.Wait()
        }
 }
 
@@ -162,7 +170,6 @@ func (ws *webseedPeer) onClose() {
                        p.updateRequests("webseedPeer.onClose")
                }
        })
-       ws.requesterCond.Broadcast()
 }
 
 func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {