From b3aea1a6212a5e1b1441ab04327dbbeabf89596b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 27 Aug 2024 11:32:38 +1000 Subject: [PATCH] Fix errors in webseed causing very long stalls in requesting Reuse the "too fast" error handling for all errors when requesting from webseeds. This prevents long stalls due to common errors. --- go.mod | 2 +- ordered-bitmap.go | 15 +++++++- request-strategy/peer.go | 5 ++- tests/webseed-partial-seed/go.mod | 2 +- webseed-peer.go | 59 ++++++++++++++++--------------- webseed/client.go | 4 +-- 6 files changed, 52 insertions(+), 35 deletions(-) diff --git a/go.mod b/go.mod index 05ca5af7..bcbd7cac 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/anacrolix/torrent -go 1.22 +go 1.23 require ( github.com/RoaringBitmap/roaring v1.2.3 diff --git a/ordered-bitmap.go b/ordered-bitmap.go index 74106716..f1d867ef 100644 --- a/ordered-bitmap.go +++ b/ordered-bitmap.go @@ -3,6 +3,7 @@ package torrent import ( g "github.com/anacrolix/generics" list "github.com/bahlo/generic-list-go" + "iter" "github.com/anacrolix/torrent/typed-roaring" ) @@ -41,12 +42,24 @@ func (o *orderedBitmap[T]) Rank(index T) uint64 { return o.bitmap.Rank(index) } -func (o *orderedBitmap[T]) Iterate(f func(T) bool) { +func (o *orderedBitmap[T]) Iterate(f func(T) bool) (all bool) { for e := o.order.Front(); e != nil; e = e.Next() { if !f(e.Value) { return } } + all = true + return +} + +func (o *orderedBitmap[T]) Iterator() iter.Seq[T] { + return func(yield func(T) bool) { + for e := o.order.Front(); e != nil; e = e.Next() { + if !yield(e.Value) { + return + } + } + } } func (o *orderedBitmap[T]) CheckedRemove(index T) bool { diff --git a/request-strategy/peer.go b/request-strategy/peer.go index 4176188c..a922feb3 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -2,6 +2,7 @@ package requestStrategy import ( typedRoaring "github.com/anacrolix/torrent/typed-roaring" + "iter" ) type PeerRequestState struct { @@ -24,7 +25,9 @@ type PeerRequests interface { // See roaring.Bitmap.Rank. Rank(RequestIndex) uint64 // Must yield in order items were added. - Iterate(func(RequestIndex) bool) + Iterate(func(RequestIndex) bool) (all bool) + // Must yield in order items were added. + Iterator() iter.Seq[RequestIndex] // See roaring.Bitmap.CheckedRemove. CheckedRemove(RequestIndex) bool // Iterate a snapshot of the values. It is safe to mutate the underlying data structure. diff --git a/tests/webseed-partial-seed/go.mod b/tests/webseed-partial-seed/go.mod index d7d52baf..98e28918 100644 --- a/tests/webseed-partial-seed/go.mod +++ b/tests/webseed-partial-seed/go.mod @@ -1,6 +1,6 @@ module github.com/anacrolix/torrent/tests/webseed-partial-seed -go 1.22.3 +go 1.23 require ( github.com/anacrolix/torrent v1.56.1 diff --git a/webseed-peer.go b/webseed-peer.go index ca915f38..c71691de 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -17,7 +17,6 @@ import ( ) const ( - webseedPeerUnhandledErrorSleep = 5 * time.Second webseedPeerCloseOnUnhandledError = false ) @@ -76,8 +75,14 @@ func (ws *webseedPeer) _request(r Request) bool { return true } -func (ws *webseedPeer) doRequest(r Request) error { - webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) +// Returns true if we should look for another request to start. Returns false if we handled this +// one. +func (ws *webseedPeer) requestIteratorLocked(requesterIndex int, x RequestIndex) bool { + r := ws.peer.t.requestIndexToRequest(x) + if _, ok := ws.activeRequests[r]; ok { + return true + } + webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r)) ws.activeRequests[r] = webseedRequest err := func() error { ws.requesterCond.L.Unlock() @@ -85,7 +90,24 @@ func (ws *webseedPeer) doRequest(r Request) error { return ws.requestResultHandler(r, webseedRequest) }() delete(ws.activeRequests, r) - return err + ws.requesterCond.L.Unlock() + defer ws.requesterCond.L.Lock() + if err != nil { + level := log.Warning + if errors.Is(err, context.Canceled) { + level = log.Debug + } + ws.peer.logger.Levelf(level, "requester %v: error doing webseed request %v: %v", requesterIndex, r, err) + // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down + // any kind of error. There are maxRequests (in Torrent.addWebSeed) requestors bouncing + // around it doesn't hurt to slow a few down if there are issues. + select { + case <-ws.peer.closed.Done(): + case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))): + } + } + return false + } func (ws *webseedPeer) requester(i int) { @@ -93,33 +115,12 @@ func (ws *webseedPeer) requester(i int) { defer ws.requesterCond.L.Unlock() start: for !ws.peer.closed.IsSet() { - // Restart is set if we don't need to wait for the requestCond before trying again. - restart := false - ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { - r := ws.peer.t.requestIndexToRequest(x) - if _, ok := ws.activeRequests[r]; ok { - return true - } - err := ws.doRequest(r) - ws.requesterCond.L.Unlock() - if err != nil && !errors.Is(err, context.Canceled) { - ws.peer.logger.Printf("requester %v: error doing webseed request %v: %v", i, r, err) - } - restart = true - if errors.Is(err, webseed.ErrTooFast) { - time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second)))) + for reqIndex := range ws.peer.requestState.Requests.Iterator() { + if !ws.requestIteratorLocked(i, reqIndex) { + goto start } - // Demeter is throwing a tantrum on Mount Olympus for this - ws.peer.t.cl.locker().RLock() - duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep)) - ws.peer.t.cl.locker().RUnlock() - time.Sleep(duration) - ws.requesterCond.L.Lock() - return false - }) - if restart { - goto start } + // Found no requests to handle, so wait. ws.requesterCond.Wait() } } diff --git a/webseed/client.go b/webseed/client.go index c58f7124..e6dce7a8 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -68,8 +68,8 @@ type RequestResult struct { Err error } -func (ws *Client) NewRequest(r RequestSpec) Request { - ctx, cancel := context.WithCancel(context.Background()) +func (ws *Client) StartNewRequest(r RequestSpec) Request { + ctx, cancel := context.WithCancel(context.TODO()) var requestParts []requestPart if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool { req, err := newRequest( -- 2.48.1