]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix errors in webseed causing very long stalls in requesting
authorMatt Joiner <anacrolix@gmail.com>
Tue, 27 Aug 2024 01:32:38 +0000 (11:32 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 27 Aug 2024 01:32:38 +0000 (11:32 +1000)
Reuse the "too fast" error handling for all errors when requesting from webseeds. This prevents long stalls due to common errors.

go.mod
ordered-bitmap.go
request-strategy/peer.go
tests/webseed-partial-seed/go.mod
webseed-peer.go
webseed/client.go

diff --git a/go.mod b/go.mod
index 05ca5af7d5215eee45d3df036da9faa70babc303..bcbd7caccb6fe63a144beb2c80337bc4c7805678 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
 module github.com/anacrolix/torrent
 
-go 1.22
+go 1.23
 
 require (
        github.com/RoaringBitmap/roaring v1.2.3
index 741067168431032cec7b71d2aa9ac98079a98afb..f1d867ef1ce1be1342a238d43125984a3e7e977c 100644 (file)
@@ -3,6 +3,7 @@ package torrent
 import (
        g "github.com/anacrolix/generics"
        list "github.com/bahlo/generic-list-go"
+       "iter"
 
        "github.com/anacrolix/torrent/typed-roaring"
 )
@@ -41,12 +42,24 @@ func (o *orderedBitmap[T]) Rank(index T) uint64 {
        return o.bitmap.Rank(index)
 }
 
-func (o *orderedBitmap[T]) Iterate(f func(T) bool) {
+func (o *orderedBitmap[T]) Iterate(f func(T) bool) (all bool) {
        for e := o.order.Front(); e != nil; e = e.Next() {
                if !f(e.Value) {
                        return
                }
        }
+       all = true
+       return
+}
+
+func (o *orderedBitmap[T]) Iterator() iter.Seq[T] {
+       return func(yield func(T) bool) {
+               for e := o.order.Front(); e != nil; e = e.Next() {
+                       if !yield(e.Value) {
+                               return
+                       }
+               }
+       }
 }
 
 func (o *orderedBitmap[T]) CheckedRemove(index T) bool {
index 4176188cebb8a766f6e46e4ee62dbcb5b5dab5bb..a922feb31066ab8f822597b73b9529f8ad51d38c 100644 (file)
@@ -2,6 +2,7 @@ package requestStrategy
 
 import (
        typedRoaring "github.com/anacrolix/torrent/typed-roaring"
+       "iter"
 )
 
 type PeerRequestState struct {
@@ -24,7 +25,9 @@ type PeerRequests interface {
        // See roaring.Bitmap.Rank.
        Rank(RequestIndex) uint64
        // Must yield in order items were added.
-       Iterate(func(RequestIndex) bool)
+       Iterate(func(RequestIndex) bool) (all bool)
+       // Must yield in order items were added.
+       Iterator() iter.Seq[RequestIndex]
        // See roaring.Bitmap.CheckedRemove.
        CheckedRemove(RequestIndex) bool
        // Iterate a snapshot of the values. It is safe to mutate the underlying data structure.
index d7d52bafc2aabffaef2e0abba50a265f0292b929..98e28918082d74697d26d9296655c533211050d6 100644 (file)
@@ -1,6 +1,6 @@
 module github.com/anacrolix/torrent/tests/webseed-partial-seed
 
-go 1.22.3
+go 1.23
 
 require (
        github.com/anacrolix/torrent v1.56.1
index ca915f386116f13bc0ae26976bb305cdb3563992..c71691dece59b4985d0a8c5620ab21b6d578267b 100644 (file)
@@ -17,7 +17,6 @@ import (
 )
 
 const (
-       webseedPeerUnhandledErrorSleep   = 5 * time.Second
        webseedPeerCloseOnUnhandledError = false
 )
 
@@ -76,8 +75,14 @@ func (ws *webseedPeer) _request(r Request) bool {
        return true
 }
 
-func (ws *webseedPeer) doRequest(r Request) error {
-       webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
+// Returns true if we should look for another request to start. Returns false if we handled this
+// one.
+func (ws *webseedPeer) requestIteratorLocked(requesterIndex int, x RequestIndex) bool {
+       r := ws.peer.t.requestIndexToRequest(x)
+       if _, ok := ws.activeRequests[r]; ok {
+               return true
+       }
+       webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r))
        ws.activeRequests[r] = webseedRequest
        err := func() error {
                ws.requesterCond.L.Unlock()
@@ -85,7 +90,24 @@ func (ws *webseedPeer) doRequest(r Request) error {
                return ws.requestResultHandler(r, webseedRequest)
        }()
        delete(ws.activeRequests, r)
-       return err
+       ws.requesterCond.L.Unlock()
+       defer ws.requesterCond.L.Lock()
+       if err != nil {
+               level := log.Warning
+               if errors.Is(err, context.Canceled) {
+                       level = log.Debug
+               }
+               ws.peer.logger.Levelf(level, "requester %v: error doing webseed request %v: %v", requesterIndex, r, err)
+               // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down
+               // any kind of error. There are maxRequests (in Torrent.addWebSeed) requestors bouncing
+               // around it doesn't hurt to slow a few down if there are issues.
+               select {
+               case <-ws.peer.closed.Done():
+               case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
+               }
+       }
+       return false
+
 }
 
 func (ws *webseedPeer) requester(i int) {
@@ -93,33 +115,12 @@ func (ws *webseedPeer) requester(i int) {
        defer ws.requesterCond.L.Unlock()
 start:
        for !ws.peer.closed.IsSet() {
-               // Restart is set if we don't need to wait for the requestCond before trying again.
-               restart := false
-               ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
-                       r := ws.peer.t.requestIndexToRequest(x)
-                       if _, ok := ws.activeRequests[r]; ok {
-                               return true
-                       }
-                       err := ws.doRequest(r)
-                       ws.requesterCond.L.Unlock()
-                       if err != nil && !errors.Is(err, context.Canceled) {
-                               ws.peer.logger.Printf("requester %v: error doing webseed request %v: %v", i, r, err)
-                       }
-                       restart = true
-                       if errors.Is(err, webseed.ErrTooFast) {
-                               time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
+               for reqIndex := range ws.peer.requestState.Requests.Iterator() {
+                       if !ws.requestIteratorLocked(i, reqIndex) {
+                               goto start
                        }
-                       // Demeter is throwing a tantrum on Mount Olympus for this
-                       ws.peer.t.cl.locker().RLock()
-                       duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep))
-                       ws.peer.t.cl.locker().RUnlock()
-                       time.Sleep(duration)
-                       ws.requesterCond.L.Lock()
-                       return false
-               })
-               if restart {
-                       goto start
                }
+               // Found no requests to handle, so wait.
                ws.requesterCond.Wait()
        }
 }
index c58f7124d9e479a16298588825769a564d5f953b..e6dce7a80cb7b4aa975121f0fcb3326b5ac7b287 100644 (file)
@@ -68,8 +68,8 @@ type RequestResult struct {
        Err   error
 }
 
-func (ws *Client) NewRequest(r RequestSpec) Request {
-       ctx, cancel := context.WithCancel(context.Background())
+func (ws *Client) StartNewRequest(r RequestSpec) Request {
+       ctx, cancel := context.WithCancel(context.TODO())
        var requestParts []requestPart
        if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
                req, err := newRequest(