]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Handle 503 returns from webseed peer endpoints
authorMatt Joiner <anacrolix@gmail.com>
Mon, 6 Dec 2021 04:14:59 +0000 (15:14 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 6 Dec 2021 04:24:28 +0000 (15:24 +1100)
torrent.go
webseed-peer.go
webseed/client.go

index 2550f539f944b11ad8370b0f70a72c6008625ee6..dfc56a325dd37cca775ec88c1960fe7ca50f168d 100644 (file)
@@ -2200,10 +2200,12 @@ func (t *Torrent) addWebSeed(url string) {
        if _, ok := t.webSeeds[url]; ok {
                return
        }
-       // I don't think Go http supports pipelining requests. However we can have more ready to go
+       // I don't think Go http supports pipelining requests. However, we can have more ready to go
        // right away. This value should be some multiple of the number of connections to a host. I
-       // would expect that double maxRequests plus a bit would be appropriate.
-       const maxRequests = 32
+       // would expect that double maxRequests plus a bit would be appropriate. This value is based on
+       // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
+       // "https://webtorrent.io/torrents/".
+       const maxRequests = 16
        ws := webseedPeer{
                peer: Peer{
                        t:                        t,
@@ -2228,7 +2230,7 @@ func (t *Torrent) addWebSeed(url string) {
        ws.peer.initUpdateRequestsTimer()
        ws.requesterCond.L = t.cl.locker()
        for i := 0; i < maxRequests; i += 1 {
-               go ws.requester()
+               go ws.requester(i)
        }
        for _, f := range t.callbacks().NewPeer {
                f(&ws.peer)
index 77c9533e3fcc356bede3764afdd0752b99079844..4921ab727e2676c56463207ab34f36f9bc0f3b12 100644 (file)
@@ -4,7 +4,9 @@ import (
        "context"
        "errors"
        "fmt"
+       "math/rand"
        "sync"
+       "time"
 
        "github.com/RoaringBitmap/roaring"
        "github.com/anacrolix/log"
@@ -69,18 +71,19 @@ func (ws *webseedPeer) _request(r Request) bool {
        return true
 }
 
-func (ws *webseedPeer) doRequest(r Request) {
+func (ws *webseedPeer) doRequest(r Request) error {
        webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
        ws.activeRequests[r] = webseedRequest
-       func() {
+       err := func() error {
                ws.requesterCond.L.Unlock()
                defer ws.requesterCond.L.Lock()
-               ws.requestResultHandler(r, webseedRequest)
+               return ws.requestResultHandler(r, webseedRequest)
        }()
        delete(ws.activeRequests, r)
+       return err
 }
 
-func (ws *webseedPeer) requester() {
+func (ws *webseedPeer) requester(i int) {
        ws.requesterCond.L.Lock()
        defer ws.requesterCond.L.Unlock()
 start:
@@ -91,8 +94,16 @@ start:
                        if _, ok := ws.activeRequests[r]; ok {
                                return true
                        }
-                       ws.doRequest(r)
+                       err := ws.doRequest(r)
+                       ws.requesterCond.L.Unlock()
+                       if err != nil {
+                               log.Printf("requester %v: error doing webseed request %v: %v", i, r, err)
+                       }
                        restart = true
+                       if errors.Is(err, webseed.ErrTooFast) {
+                               time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
+                       }
+                       ws.requesterCond.L.Lock()
                        return false
                })
                if restart {
@@ -125,7 +136,7 @@ func (ws *webseedPeer) onClose() {
        ws.requesterCond.Broadcast()
 }
 
-func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
+func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
        result := <-webseedRequest.Result
        close(webseedRequest.Result) // one-shot
        // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
@@ -139,10 +150,15 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
        ws.peer.t.cl.lock()
        defer ws.peer.t.cl.unlock()
        if ws.peer.t.closed.IsSet() {
-               return
+               return nil
        }
-       if result.Err != nil {
-               if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() {
+       err := result.Err
+       if err != nil {
+               switch {
+               case errors.Is(err, context.Canceled):
+               case errors.Is(err, webseed.ErrTooFast):
+               case ws.peer.closed.IsSet():
+               default:
                        ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
                        // // Here lies my attempt to extract something concrete from Go's error system. RIP.
                        // cfg := spew.NewDefaultConfig()
@@ -152,17 +168,18 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
                        ws.peer.close()
                }
                ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
-       } else {
-               err := ws.peer.receiveChunk(&pp.Message{
-                       Type:  pp.Piece,
-                       Index: r.Index,
-                       Begin: r.Begin,
-                       Piece: result.Bytes,
-               })
-               if err != nil {
-                       panic(err)
-               }
+               return err
+       }
+       err = ws.peer.receiveChunk(&pp.Message{
+               Type:  pp.Piece,
+               Index: r.Index,
+               Begin: r.Begin,
+               Piece: result.Bytes,
+       })
+       if err != nil {
+               panic(err)
        }
+       return err
 }
 
 func (me *webseedPeer) isLowOnRequests() bool {
index 5752575de9065403249437e82bdab4ebb5fcf305..ff246a1cdc5c79a19bc4f15c84587166a61f844e 100644 (file)
@@ -3,6 +3,7 @@ package webseed
 import (
        "bytes"
        "context"
+       "errors"
        "fmt"
        "io"
        "log"
@@ -162,6 +163,8 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error
                } else {
                        return ErrBadResponse{"resp status ok but requested range", result.resp}
                }
+       case http.StatusServiceUnavailable:
+               return ErrTooFast
        default:
                return ErrBadResponse{
                        fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode),
@@ -170,6 +173,8 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error
        }
 }
 
+var ErrTooFast = errors.New("making requests too fast")
+
 func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
        var buf bytes.Buffer
        for _, part := range parts {