import (
"bytes"
"context"
+ "errors"
"fmt"
"io"
"log"
"strings"
"github.com/RoaringBitmap/roaring"
+
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/segments"
req *http.Request
e segments.Extent
result chan requestPartResult
+ start func()
+ // Wrap http response bodies for such things as download rate limiting.
+ responseBodyWrapper ResponseBodyWrapper
}
type Request struct {
// given that's how requests are mapped to webseeds, but the torrent.Client works at the piece
// level. We can map our file-level adjustments to the pieces here. This probably need to be
// private in the future, if Client ever starts removing pieces.
- Pieces roaring.Bitmap
+ Pieces roaring.Bitmap
+ ResponseBodyWrapper ResponseBodyWrapper
+ PathEscaper PathEscaper
}
+type ResponseBodyWrapper func(io.Reader) io.Reader
+
func (me *Client) SetInfo(info *metainfo.Info) {
if !strings.HasSuffix(me.Url, "/") && info.IsDir() {
// In my experience, this is a non-conforming webseed. For example the
ctx, cancel := context.WithCancel(context.Background())
var requestParts []requestPart
if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
- req, err := NewRequest(ws.Url, i, ws.info, e.Start, e.Length)
+ req, err := newRequest(
+ ws.Url, i, ws.info, e.Start, e.Length,
+ ws.PathEscaper,
+ )
if err != nil {
panic(err)
}
req = req.WithContext(ctx)
part := requestPart{
- req: req,
- result: make(chan requestPartResult, 1),
- e: e,
+ req: req,
+ result: make(chan requestPartResult, 1),
+ e: e,
+ responseBodyWrapper: ws.ResponseBodyWrapper,
+ }
+ part.start = func() {
+ go func() {
+ resp, err := ws.HttpClient.Do(req)
+ part.result <- requestPartResult{
+ resp: resp,
+ err: err,
+ }
+ }()
}
- go func() {
- resp, err := ws.HttpClient.Do(req)
- part.result <- requestPartResult{
- resp: resp,
- err: err,
- }
- }()
requestParts = append(requestParts, part)
return true
}) {
func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error {
result := <-part.result
+ // Make sure there's no further results coming, it should be a one-shot channel.
+ close(part.result)
if result.err != nil {
return result.err
}
defer result.resp.Body.Close()
+ var body io.Reader = result.resp.Body
+ if part.responseBodyWrapper != nil {
+ body = part.responseBodyWrapper(body)
+ }
+ // Prevent further accidental use
+ result.resp.Body = nil
if ctx.Err() != nil {
return ctx.Err()
}
switch result.resp.StatusCode {
case http.StatusPartialContent:
- copied, err := io.Copy(buf, result.resp.Body)
+ copied, err := io.Copy(buf, body)
if err != nil {
return err
}
// body. I don't know how one would handle multiple chunk requests resulting in an OK
// response for the same file. The request algorithm might be need to be smarter for
// that.
- discarded, _ := io.CopyN(io.Discard, result.resp.Body, part.e.Start)
+ discarded, _ := io.CopyN(io.Discard, body, part.e.Start)
if discarded != 0 {
log.Printf("discarded %v bytes in webseed request response part", discarded)
}
- _, err := io.CopyN(buf, result.resp.Body, part.e.Length)
+ _, err := io.CopyN(buf, body, part.e.Length)
return err
} else {
return ErrBadResponse{"resp status ok but requested range", result.resp}
}
+ case http.StatusServiceUnavailable:
+ return ErrTooFast
default:
return ErrBadResponse{
fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode),
}
}
-func readRequestPartResponses(ctx context.Context, parts []requestPart) ([]byte, error) {
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
+var ErrTooFast = errors.New("making requests too fast")
+
+func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
var buf bytes.Buffer
- firstErr := make(chan error, 1)
- go func() {
- for _, part := range parts {
- err := recvPartResult(ctx, &buf, part)
- if err != nil {
- // Ensure no further unnecessary response reads occur.
- cancel()
- select {
- case firstErr <- fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err):
- default:
- }
- }
- }
- select {
- case firstErr <- nil:
- default:
+ for _, part := range parts {
+ part.start()
+ err = recvPartResult(ctx, &buf, part)
+ if err != nil {
+ err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
+ break
}
- }()
- // This can't be merged into the return statement, because buf.Bytes is called first!
- err := <-firstErr
+ }
return buf.Bytes(), err
}