import (
g "github.com/anacrolix/generics"
list "github.com/bahlo/generic-list-go"
+ "iter"
"github.com/anacrolix/torrent/typed-roaring"
)
return o.bitmap.Rank(index)
}
-func (o *orderedBitmap[T]) Iterate(f func(T) bool) {
+func (o *orderedBitmap[T]) Iterate(f func(T) bool) (all bool) {
for e := o.order.Front(); e != nil; e = e.Next() {
if !f(e.Value) {
return
}
}
+ all = true
+ return
+}
+
+func (o *orderedBitmap[T]) Iterator() iter.Seq[T] {
+ return func(yield func(T) bool) {
+ for e := o.order.Front(); e != nil; e = e.Next() {
+ if !yield(e.Value) {
+ return
+ }
+ }
+ }
}
func (o *orderedBitmap[T]) CheckedRemove(index T) bool {
import (
typedRoaring "github.com/anacrolix/torrent/typed-roaring"
+ "iter"
)
type PeerRequestState struct {
// See roaring.Bitmap.Rank.
Rank(RequestIndex) uint64
// Must yield in order items were added.
- Iterate(func(RequestIndex) bool)
+ Iterate(func(RequestIndex) bool) (all bool)
+ // Must yield in order items were added.
+ Iterator() iter.Seq[RequestIndex]
// See roaring.Bitmap.CheckedRemove.
CheckedRemove(RequestIndex) bool
// Iterate a snapshot of the values. It is safe to mutate the underlying data structure.
)
const (
- webseedPeerUnhandledErrorSleep = 5 * time.Second
webseedPeerCloseOnUnhandledError = false
)
return true
}
-func (ws *webseedPeer) doRequest(r Request) error {
- webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
+// Returns true if we should look for another request to start. Returns false if we handled this
+// one.
+func (ws *webseedPeer) requestIteratorLocked(requesterIndex int, x RequestIndex) bool {
+ r := ws.peer.t.requestIndexToRequest(x)
+ if _, ok := ws.activeRequests[r]; ok {
+ return true
+ }
+ webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r))
ws.activeRequests[r] = webseedRequest
err := func() error {
ws.requesterCond.L.Unlock()
return ws.requestResultHandler(r, webseedRequest)
}()
delete(ws.activeRequests, r)
- return err
+ ws.requesterCond.L.Unlock()
+ defer ws.requesterCond.L.Lock()
+ if err != nil {
+ level := log.Warning
+ if errors.Is(err, context.Canceled) {
+ level = log.Debug
+ }
+ ws.peer.logger.Levelf(level, "requester %v: error doing webseed request %v: %v", requesterIndex, r, err)
+ // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down
+ // any kind of error. There are maxRequests (in Torrent.addWebSeed) requestors bouncing
+ // around it doesn't hurt to slow a few down if there are issues.
+ select {
+ case <-ws.peer.closed.Done():
+ case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
+ }
+ }
+ return false
+
}
func (ws *webseedPeer) requester(i int) {
defer ws.requesterCond.L.Unlock()
start:
for !ws.peer.closed.IsSet() {
- // Restart is set if we don't need to wait for the requestCond before trying again.
- restart := false
- ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
- r := ws.peer.t.requestIndexToRequest(x)
- if _, ok := ws.activeRequests[r]; ok {
- return true
- }
- err := ws.doRequest(r)
- ws.requesterCond.L.Unlock()
- if err != nil && !errors.Is(err, context.Canceled) {
- ws.peer.logger.Printf("requester %v: error doing webseed request %v: %v", i, r, err)
- }
- restart = true
- if errors.Is(err, webseed.ErrTooFast) {
- time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
+ for reqIndex := range ws.peer.requestState.Requests.Iterator() {
+ if !ws.requestIteratorLocked(i, reqIndex) {
+ goto start
}
- // Demeter is throwing a tantrum on Mount Olympus for this
- ws.peer.t.cl.locker().RLock()
- duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep))
- ws.peer.t.cl.locker().RUnlock()
- time.Sleep(duration)
- ws.requesterCond.L.Lock()
- return false
- })
- if restart {
- goto start
}
+ // Found no requests to handle, so wait.
ws.requesterCond.Wait()
}
}