From a2c50ea2bd98144cd8ea37dffec015e745ac3189 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 6 Dec 2021 15:14:59 +1100 Subject: [PATCH] Handle 503 returns from webseed peer endpoints --- torrent.go | 10 +++++---- webseed-peer.go | 55 +++++++++++++++++++++++++++++++---------------- webseed/client.go | 5 +++++ 3 files changed, 47 insertions(+), 23 deletions(-) diff --git a/torrent.go b/torrent.go index 2550f539..dfc56a32 100644 --- a/torrent.go +++ b/torrent.go @@ -2200,10 +2200,12 @@ func (t *Torrent) addWebSeed(url string) { if _, ok := t.webSeeds[url]; ok { return } - // I don't think Go http supports pipelining requests. However we can have more ready to go + // I don't think Go http supports pipelining requests. However, we can have more ready to go // right away. This value should be some multiple of the number of connections to a host. I - // would expect that double maxRequests plus a bit would be appropriate. - const maxRequests = 32 + // would expect that double maxRequests plus a bit would be appropriate. This value is based on + // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from + // "https://webtorrent.io/torrents/". + const maxRequests = 16 ws := webseedPeer{ peer: Peer{ t: t, @@ -2228,7 +2230,7 @@ func (t *Torrent) addWebSeed(url string) { ws.peer.initUpdateRequestsTimer() ws.requesterCond.L = t.cl.locker() for i := 0; i < maxRequests; i += 1 { - go ws.requester() + go ws.requester(i) } for _, f := range t.callbacks().NewPeer { f(&ws.peer) diff --git a/webseed-peer.go b/webseed-peer.go index 77c9533e..4921ab72 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "math/rand" "sync" + "time" "github.com/RoaringBitmap/roaring" "github.com/anacrolix/log" @@ -69,18 +71,19 @@ func (ws *webseedPeer) _request(r Request) bool { return true } -func (ws *webseedPeer) doRequest(r Request) { +func (ws *webseedPeer) doRequest(r Request) error { webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) ws.activeRequests[r] = webseedRequest - func() { + err := func() error { ws.requesterCond.L.Unlock() defer ws.requesterCond.L.Lock() - ws.requestResultHandler(r, webseedRequest) + return ws.requestResultHandler(r, webseedRequest) }() delete(ws.activeRequests, r) + return err } -func (ws *webseedPeer) requester() { +func (ws *webseedPeer) requester(i int) { ws.requesterCond.L.Lock() defer ws.requesterCond.L.Unlock() start: @@ -91,8 +94,16 @@ start: if _, ok := ws.activeRequests[r]; ok { return true } - ws.doRequest(r) + err := ws.doRequest(r) + ws.requesterCond.L.Unlock() + if err != nil { + log.Printf("requester %v: error doing webseed request %v: %v", i, r, err) + } restart = true + if errors.Is(err, webseed.ErrTooFast) { + time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second)))) + } + ws.requesterCond.L.Lock() return false }) if restart { @@ -125,7 +136,7 @@ func (ws *webseedPeer) onClose() { ws.requesterCond.Broadcast() } -func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) { +func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error { result := <-webseedRequest.Result close(webseedRequest.Result) // one-shot // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not @@ -139,10 +150,15 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re ws.peer.t.cl.lock() defer ws.peer.t.cl.unlock() if ws.peer.t.closed.IsSet() { - return + return nil } - if result.Err != nil { - if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() { + err := result.Err + if err != nil { + switch { + case errors.Is(err, context.Canceled): + case errors.Is(err, webseed.ErrTooFast): + case ws.peer.closed.IsSet(): + default: ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) // // Here lies my attempt to extract something concrete from Go's error system. RIP. // cfg := spew.NewDefaultConfig() @@ -152,17 +168,18 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re ws.peer.close() } ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) - } else { - err := ws.peer.receiveChunk(&pp.Message{ - Type: pp.Piece, - Index: r.Index, - Begin: r.Begin, - Piece: result.Bytes, - }) - if err != nil { - panic(err) - } + return err + } + err = ws.peer.receiveChunk(&pp.Message{ + Type: pp.Piece, + Index: r.Index, + Begin: r.Begin, + Piece: result.Bytes, + }) + if err != nil { + panic(err) } + return err } func (me *webseedPeer) isLowOnRequests() bool { diff --git a/webseed/client.go b/webseed/client.go index 5752575d..ff246a1c 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -3,6 +3,7 @@ package webseed import ( "bytes" "context" + "errors" "fmt" "io" "log" @@ -162,6 +163,8 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error } else { return ErrBadResponse{"resp status ok but requested range", result.resp} } + case http.StatusServiceUnavailable: + return ErrTooFast default: return ErrBadResponse{ fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode), @@ -170,6 +173,8 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error } } +var ErrTooFast = errors.New("making requests too fast") + func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) { var buf bytes.Buffer for _, part := range parts { -- 2.48.1