From: Matt Joiner Date: Tue, 6 May 2025 23:36:33 +0000 (+1000) Subject: Create webseed requester goroutines as required X-Git-Tag: v1.59.0~175 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=ed441b1763d0f45f5ca21160697dc6aca64610fc;p=btrtrc.git Create webseed requester goroutines as required --- diff --git a/peer-impl.go b/peer-impl.go index 2a036aea..391bce68 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -19,6 +19,8 @@ type legacyPeerImpl interface { // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be // handled by a follow-up event. _cancel(RequestIndex) (acked bool) + // The final piece to actually commit to a request. Typically, this sends or begins handling the + // request. _request(Request) bool connectionFlags() string onClose() diff --git a/torrent.go b/torrent.go index 116507fa..93e5b868 100644 --- a/torrent.go +++ b/torrent.go @@ -3007,10 +3007,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { // conns. ~4x maxRequests would be about right. ws.peer.PeerMaxRequests = 4 * ws.client.MaxRequests ws.peer.initUpdateRequestsTimer() - ws.requesterCond.L = t.cl.locker() - for i := 0; i < ws.client.MaxRequests; i += 1 { - go ws.requester(i) - } + ws.locker = t.cl.locker() for _, f := range t.callbacks().NewPeer { f(&ws.peer) } diff --git a/webseed-peer.go b/webseed-peer.go index c6da58ff..251d6fe9 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "iter" "math/rand" "sync" "time" @@ -25,7 +26,7 @@ type webseedPeer struct { peer Peer client webseed.Client activeRequests map[Request]webseed.Request - requesterCond sync.Cond + locker sync.Locker lastUnhandledErr time.Time } @@ -76,59 +77,66 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { } func (ws *webseedPeer) _request(r Request) bool { - ws.requesterCond.Signal() + ws.spawnRequests() return true } -// Returns true if we should look for another request to start. Returns false if we handled this -// one. -func (ws *webseedPeer) requestIteratorLocked(requesterIndex int, x RequestIndex) bool { - r := ws.peer.t.requestIndexToRequest(x) - if _, ok := ws.activeRequests[r]; ok { - return true - } +func (ws *webseedPeer) spawnReq(r Request) { webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r)) ws.activeRequests[r] = webseedRequest - locker := ws.requesterCond.L - err := func() error { - locker.Unlock() - defer locker.Lock() - return ws.requestResultHandler(r, webseedRequest) - }() - delete(ws.activeRequests, r) + go ws.doRequest(r, webseedRequest) +} + +func (ws *webseedPeer) doRequest(r Request, webseedRequest webseed.Request) { + locker := ws.locker + err := ws.requestResultHandler(r, webseedRequest) if err != nil { level := log.Warning if errors.Is(err, context.Canceled) { level = log.Debug } - ws.peer.logger.Levelf(level, "requester %v: error doing webseed request %v: %v", requesterIndex, r, err) + ws.peer.logger.Levelf(level, "error doing webseed request %v: %v", r, err) // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any - // kind of error. There are maxRequests (in Torrent.addWebSeed) requestors bouncing around + // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around // it doesn't hurt to slow a few down if there are issues. - locker.Unlock() select { case <-ws.peer.closed.Done(): case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))): } - locker.Lock() + } + locker.Lock() + // Delete this entry after waiting above on an error, to prevent more requests. + delete(ws.activeRequests, r) + if err != nil { ws.peer.updateRequests("webseedPeer request errored") } - return false + ws.spawnRequests() + locker.Unlock() +} +func (ws *webseedPeer) spawnRequests() { + next, stop := iter.Pull(ws.inactiveRequests()) + defer stop() + for len(ws.activeRequests) <= ws.client.MaxRequests { + req, ok := next() + if !ok { + break + } + ws.spawnReq(req) + } } -func (ws *webseedPeer) requester(i int) { - ws.requesterCond.L.Lock() - defer ws.requesterCond.L.Unlock() -start: - for !ws.peer.closed.IsSet() { +func (ws *webseedPeer) inactiveRequests() iter.Seq[Request] { + return func(yield func(Request) bool) { for reqIndex := range ws.peer.requestState.Requests.Iterator() { - if !ws.requestIteratorLocked(i, reqIndex) { - goto start + r := ws.peer.t.requestIndexToRequest(reqIndex) + _, ok := ws.activeRequests[r] + if !ok { + if !yield(r) { + return + } } } - // Found no requests to handle, so wait. - ws.requesterCond.Wait() } } @@ -162,7 +170,6 @@ func (ws *webseedPeer) onClose() { p.updateRequests("webseedPeer.onClose") } }) - ws.requesterCond.Broadcast() } func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {