]> Sergey Matveev's repositories - btrtrc.git/blobdiff - webseed/client.go
Drop support for go 1.20
[btrtrc.git] / webseed / client.go
index 87090e21ed8fecc85cd4296189e28de9dc74d7e2..ac42b8a427c3d6fe369f159e125611ffda434aa8 100644 (file)
@@ -3,6 +3,7 @@ package webseed
 import (
        "bytes"
        "context"
+       "errors"
        "fmt"
        "io"
        "log"
@@ -10,6 +11,7 @@ import (
        "strings"
 
        "github.com/RoaringBitmap/roaring"
+
        "github.com/anacrolix/torrent/common"
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/segments"
@@ -26,6 +28,9 @@ type requestPart struct {
        req    *http.Request
        e      segments.Extent
        result chan requestPartResult
+       start  func()
+       // Wrap http response bodies for such things as download rate limiting.
+       responseBodyWrapper ResponseBodyWrapper
 }
 
 type Request struct {
@@ -46,9 +51,13 @@ type Client struct {
        // given that's how requests are mapped to webseeds, but the torrent.Client works at the piece
        // level. We can map our file-level adjustments to the pieces here. This probably need to be
        // private in the future, if Client ever starts removing pieces.
-       Pieces roaring.Bitmap
+       Pieces              roaring.Bitmap
+       ResponseBodyWrapper ResponseBodyWrapper
+       PathEscaper         PathEscaper
 }
 
+type ResponseBodyWrapper func(io.Reader) io.Reader
+
 func (me *Client) SetInfo(info *metainfo.Info) {
        if !strings.HasSuffix(me.Url, "/") && info.IsDir() {
                // In my experience, this is a non-conforming webseed. For example the
@@ -69,23 +78,29 @@ func (ws *Client) NewRequest(r RequestSpec) Request {
        ctx, cancel := context.WithCancel(context.Background())
        var requestParts []requestPart
        if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
-               req, err := NewRequest(ws.Url, i, ws.info, e.Start, e.Length)
+               req, err := newRequest(
+                       ws.Url, i, ws.info, e.Start, e.Length,
+                       ws.PathEscaper,
+               )
                if err != nil {
                        panic(err)
                }
                req = req.WithContext(ctx)
                part := requestPart{
-                       req:    req,
-                       result: make(chan requestPartResult, 1),
-                       e:      e,
+                       req:                 req,
+                       result:              make(chan requestPartResult, 1),
+                       e:                   e,
+                       responseBodyWrapper: ws.ResponseBodyWrapper,
+               }
+               part.start = func() {
+                       go func() {
+                               resp, err := ws.HttpClient.Do(req)
+                               part.result <- requestPartResult{
+                                       resp: resp,
+                                       err:  err,
+                               }
+                       }()
                }
-               go func() {
-                       resp, err := ws.HttpClient.Do(req)
-                       part.result <- requestPartResult{
-                               resp: resp,
-                               err:  err,
-                       }
-               }()
                requestParts = append(requestParts, part)
                return true
        }) {
@@ -116,16 +131,24 @@ func (me ErrBadResponse) Error() string {
 
 func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error {
        result := <-part.result
+       // Make sure there's no further results coming, it should be a one-shot channel.
+       close(part.result)
        if result.err != nil {
                return result.err
        }
        defer result.resp.Body.Close()
+       var body io.Reader = result.resp.Body
+       if part.responseBodyWrapper != nil {
+               body = part.responseBodyWrapper(body)
+       }
+       // Prevent further accidental use
+       result.resp.Body = nil
        if ctx.Err() != nil {
                return ctx.Err()
        }
        switch result.resp.StatusCode {
        case http.StatusPartialContent:
-               copied, err := io.Copy(buf, result.resp.Body)
+               copied, err := io.Copy(buf, body)
                if err != nil {
                        return err
                }
@@ -139,16 +162,26 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error
                // archive.org might be using a webserver implementation that refuses to do partial
                // responses to small files.
                if part.e.Start < 48<<10 {
-                       log.Printf("resp status ok but requested range [url=%q, range=%q]", part.req.URL, part.req.Header.Get("Range"))
-                       discarded, _ := io.CopyN(io.Discard, result.resp.Body, part.e.Start)
+                       if part.e.Start != 0 {
+                               log.Printf("resp status ok but requested range [url=%q, range=%q]",
+                                       part.req.URL,
+                                       part.req.Header.Get("Range"))
+                       }
+                       // Instead of discarding, we could try receiving all the chunks present in the response
+                       // body. I don't know how one would handle multiple chunk requests resulting in an OK
+                       // response for the same file. The request algorithm might be need to be smarter for
+                       // that.
+                       discarded, _ := io.CopyN(io.Discard, body, part.e.Start)
                        if discarded != 0 {
                                log.Printf("discarded %v bytes in webseed request response part", discarded)
                        }
-                       _, err := io.CopyN(buf, result.resp.Body, part.e.Length)
+                       _, err := io.CopyN(buf, body, part.e.Length)
                        return err
                } else {
                        return ErrBadResponse{"resp status ok but requested range", result.resp}
                }
+       case http.StatusServiceUnavailable:
+               return ErrTooFast
        default:
                return ErrBadResponse{
                        fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode),
@@ -157,29 +190,17 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error
        }
 }
 
-func readRequestPartResponses(ctx context.Context, parts []requestPart) ([]byte, error) {
-       ctx, cancel := context.WithCancel(ctx)
-       defer cancel()
+var ErrTooFast = errors.New("making requests too fast")
+
+func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
        var buf bytes.Buffer
-       firstErr := make(chan error, 1)
-       go func() {
-               for _, part := range parts {
-                       err := recvPartResult(ctx, &buf, part)
-                       if err != nil {
-                               // Ensure no further unnecessary response reads occur.
-                               cancel()
-                               select {
-                               case firstErr <- fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err):
-                               default:
-                               }
-                       }
-               }
-               select {
-               case firstErr <- nil:
-               default:
+       for _, part := range parts {
+               part.start()
+               err = recvPartResult(ctx, &buf, part)
+               if err != nil {
+                       err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
+                       break
                }
-       }()
-       // This can't be merged into the return statement, because buf.Bytes is called first!
-       err := <-firstErr
+       }
        return buf.Bytes(), err
 }