"context"
"errors"
"fmt"
+ "iter"
"math/rand"
"sync"
"time"
peer Peer
client webseed.Client
activeRequests map[Request]webseed.Request
- requesterCond sync.Cond
+ locker sync.Locker
lastUnhandledErr time.Time
}
}
func (ws *webseedPeer) _request(r Request) bool {
- ws.requesterCond.Signal()
+ ws.spawnRequests()
return true
}
-// Returns true if we should look for another request to start. Returns false if we handled this
-// one.
-func (ws *webseedPeer) requestIteratorLocked(requesterIndex int, x RequestIndex) bool {
- r := ws.peer.t.requestIndexToRequest(x)
- if _, ok := ws.activeRequests[r]; ok {
- return true
- }
+func (ws *webseedPeer) spawnReq(r Request) {
webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r))
ws.activeRequests[r] = webseedRequest
- locker := ws.requesterCond.L
- err := func() error {
- locker.Unlock()
- defer locker.Lock()
- return ws.requestResultHandler(r, webseedRequest)
- }()
- delete(ws.activeRequests, r)
+ go ws.doRequest(r, webseedRequest)
+}
+
+func (ws *webseedPeer) doRequest(r Request, webseedRequest webseed.Request) {
+ locker := ws.locker
+ err := ws.requestResultHandler(r, webseedRequest)
if err != nil {
level := log.Warning
if errors.Is(err, context.Canceled) {
level = log.Debug
}
- ws.peer.logger.Levelf(level, "requester %v: error doing webseed request %v: %v", requesterIndex, r, err)
+ ws.peer.logger.Levelf(level, "error doing webseed request %v: %v", r, err)
// This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
- // kind of error. There are maxRequests (in Torrent.addWebSeed) requestors bouncing around
+ // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around
// it doesn't hurt to slow a few down if there are issues.
- locker.Unlock()
select {
case <-ws.peer.closed.Done():
case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
}
- locker.Lock()
+ }
+ locker.Lock()
+ // Delete this entry after waiting above on an error, to prevent more requests.
+ delete(ws.activeRequests, r)
+ if err != nil {
ws.peer.updateRequests("webseedPeer request errored")
}
- return false
+ ws.spawnRequests()
+ locker.Unlock()
+}
+func (ws *webseedPeer) spawnRequests() {
+ next, stop := iter.Pull(ws.inactiveRequests())
+ defer stop()
+ for len(ws.activeRequests) <= ws.client.MaxRequests {
+ req, ok := next()
+ if !ok {
+ break
+ }
+ ws.spawnReq(req)
+ }
}
-func (ws *webseedPeer) requester(i int) {
- ws.requesterCond.L.Lock()
- defer ws.requesterCond.L.Unlock()
-start:
- for !ws.peer.closed.IsSet() {
+func (ws *webseedPeer) inactiveRequests() iter.Seq[Request] {
+ return func(yield func(Request) bool) {
for reqIndex := range ws.peer.requestState.Requests.Iterator() {
- if !ws.requestIteratorLocked(i, reqIndex) {
- goto start
+ r := ws.peer.t.requestIndexToRequest(reqIndex)
+ _, ok := ws.activeRequests[r]
+ if !ok {
+ if !yield(r) {
+ return
+ }
}
}
- // Found no requests to handle, so wait.
- ws.requesterCond.Wait()
}
}
p.updateRequests("webseedPeer.onClose")
}
})
- ws.requesterCond.Broadcast()
}
func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {