"io"
"iter"
"log/slog"
+ "math/rand"
"slices"
"sync"
"time"
"github.com/RoaringBitmap/roaring"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/webseed"
}
func (me *webseedPeer) shouldUpdateRequests() bool {
- return me.peer.t.cl.underWebSeedHttpRequestLimit()
+ return me.numRequests() < me.client.MaxRequests && me.peer.t.cl.underWebSeedHttpRequestLimit()
}
func (me *webseedPeer) updateRequests() {
func (ws *webseedPeer) activeRequestsForIndex(r RequestIndex) iter.Seq[*webseedRequest] {
return func(yield func(*webseedRequest) bool) {
for wr := range ws.activeRequests {
- if r < wr.next || r >= wr.end {
- continue
- }
- if !yield(wr) {
- return
+ if r >= wr.next && r < wr.end {
+ if !yield(wr) {
+ return
+ }
}
}
}
end: end,
}
ws.activeRequests[&wsReq] = struct{}{}
+ ws.peer.t.cl.numWebSeedRequests++
ws.peer.logger.Slogger().Debug(
"starting webseed request",
"begin", begin,
err := ws.readChunks(webseedRequest)
// Ensure the body reader and response are closed.
webseedRequest.request.Cancel()
- locker.Lock()
if err != nil {
level := slog.LevelWarn
if errors.Is(err, context.Canceled) {
level = slog.LevelDebug
- } else {
- panic(err)
}
ws.slogger().Log(context.TODO(), level, "webseed request error", "err", err)
- // // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
- // // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around
- // // it doesn't hurt to slow a few down if there are issues.
- // select {
- // case <-ws.peer.closed.Done():
- // case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
- // }
+ // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
+ // kind of error. Pausing here will starve the available requester slots which slows things
+ // down.
+ select {
+ case <-ws.peer.closed.Done():
+ case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
+ }
}
- //locker.Lock()
+ ws.slogger().Debug("webseed request ended")
+ locker.Lock()
// Delete this entry after waiting above on an error, to prevent more requests.
- delete(ws.activeRequests, webseedRequest)
+ ws.deleteActiveRequest(webseedRequest)
if err != nil {
ws.peer.onNeedUpdateRequests("webseedPeer request errored")
}
- ws.peer.t.cl.updateWebSeedRequests()
+ ws.peer.t.cl.updateWebSeedRequests("webseedPeer request completed")
locker.Unlock()
}
+func (ws *webseedPeer) deleteActiveRequest(wr *webseedRequest) {
+ g.MustDelete(ws.activeRequests, wr)
+ ws.peer.t.cl.numWebSeedRequests--
+}
+
func (ws *webseedPeer) spawnRequests() {
next, stop := iter.Pull(ws.inactiveRequests())
defer stop()
- for len(ws.activeRequests) < ws.client.MaxRequests && ws.peer.t.cl.underWebSeedHttpRequestLimit() {
+ for {
+ if len(ws.activeRequests) >= ws.client.MaxRequests {
+ break
+ }
req, ok := next()
if !ok {
break
}
+ if !ws.peer.t.cl.underWebSeedHttpRequestLimit() {
+ break
+ }
end := seqLast(ws.iterConsecutiveInactiveRequests(req)).Unwrap()
ws.spawnRequest(req, end+1)
}