// How long between writes before sending a keep alive message on a peer connection that we want
// to maintain.
KeepAliveTimeout time.Duration
- // Maximum bytes to buffer per peer connection for peer request data before it is sent.
+ // Maximum bytes to buffer per peer connection for peer request data before it is sent. This
+ // must be >= the request chunk size from peers.
MaxAllocPeerRequestDataPerConn int
// The IP addresses as our peers should see them. May differ from the
package torrent
+
+import (
+ "golang.org/x/exp/constraints"
+)
+
+func intCeilDiv[T constraints.Integer](a, b T) T {
+ // This still sux for negative numbers due to truncating division. But I don't know that we need
+ // or ceil division makes sense for negative numbers.
+ return (a + b - 1) / b
+}
return rip.To4() == nil && rip.To16() != nil
}
-func maxInt(as ...int) int {
- ret := as[0]
- for _, a := range as[1:] {
- if a > ret {
- ret = a
- }
- }
- return ret
-}
-
-func minInt(as ...int) int {
- ret := as[0]
- for _, a := range as[1:] {
- if a < ret {
- ret = a
- }
- }
- return ret
-}
-
var unlimited = rate.NewLimiter(rate.Inf, 0)
type (
"github.com/anacrolix/torrent/metainfo"
)
-// Contains implementation details that differ between peer types, like Webseeds and regular
+// Contains implementation details that differ between peer types, like WebSeeds and regular
// BitTorrent protocol connections. These methods are embedded in the child types of Peer for legacy
// expectations that they exist on the child type. Some methods are underlined to avoid collisions
// with legacy PeerConn methods. New methods and calls that are fixed up should be migrated over to
// Abstract methods implemented by subclasses of Peer.
type newHotPeerImpl interface {
lastWriteUploadRate() float64
+ // How many requests should be assigned to the peer.
+ nominalMaxRequests() maxRequests
}
lastChunkSent time.Time
// Stuff controlled by the local peer.
- needRequestUpdate updateRequestReason
+ needRequestUpdate updateRequestReason
+ // TODO: How are pending cancels handled for webseed peers?
requestState request_strategy.PeerRequestState
updateRequestsTimer *time.Timer
lastRequestUpdate time.Time
&cn._stats.ChunksWritten,
cn.requestState.Requests.GetCardinality(),
cn.requestState.Cancelled.GetCardinality(),
- cn.nominalMaxRequests(),
+ cn.peerImpl.nominalMaxRequests(),
cn.PeerMaxRequests,
len(cn.peerRequests),
localClientReqq,
)
// The actual value to use as the maximum outbound requests.
-func (cn *Peer) nominalMaxRequests() maxRequests {
+func (cn *PeerConn) nominalMaxRequests() maxRequests {
// TODO: This should differ for webseeds...
return max(1, min(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))
}
return nil
}
-func (cn *PeerConn) mustRequest(r RequestIndex) bool {
+func (cn *Peer) mustRequest(r RequestIndex) bool {
more, err := cn.request(r)
if err != nil {
panic(err)
return more
}
-func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
+func (cn *Peer) request(r RequestIndex) (more bool, err error) {
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
if cn.requestState.Requests.Contains(r) {
return true, nil
}
- if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
+ if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.peerImpl.nominalMaxRequests() {
return true, errors.New("too many outstanding requests")
}
cn.requestState.Requests.Add(r)
}
cn.validReceiveChunks[r]++
cn.t.requestState[r] = requestState{
- peer: &cn.Peer,
+ peer: cn,
when: time.Now(),
}
cn.updateExpectingChunks()
ppReq := cn.t.requestIndexToRequest(r)
for _, f := range cn.callbacks.SentRequest {
- f(PeerRequestEvent{&cn.Peer, ppReq})
+ f(PeerRequestEvent{cn, ppReq})
}
return cn.legacyPeerImpl._request(ppReq), nil
}
})
}
-func (me *PeerConn) _cancel(r RequestIndex) bool {
+func (me *PeerConn) handleCancel(r RequestIndex) {
me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
+}
+
+func (me *PeerConn) acksCancels() bool {
return me.remoteRejectsCancels()
}
case pp.Piece:
c.doChunkReadStats(int64(len(msg.Piece)))
err = c.receiveChunk(&msg)
- if len(msg.Piece) == int(t.chunkSize) {
- t.chunkPool.Put(&msg.Piece)
- }
+ t.putChunkBuffer(msg.Piece)
+ msg.Piece = nil
if err != nil {
err = fmt.Errorf("receiving chunk: %w", err)
}
proofLayers: m.ProofLayers,
}
}
+
+func (me *PeerConn) peerPtr() *Peer {
+ return &me.Peer
+}
tor.cacheNextRequestIndexesForReuse(rs.Requests.requestIndexes)
// End of part that should be timed.
remainingChunks := (numPieces - completed) * (pieceLength / chunkSize)
- c.Assert(rs.Requests.requestIndexes, qt.HasLen, minInt(
+ c.Assert(rs.Requests.requestIndexes, qt.HasLen, min(
remainingChunks,
int(cl.config.MaxUnverifiedBytes/chunkSize)))
}
// This gets the best-case request state. That means handling pieces limited by capacity, preferring
// earlier pieces, low availability etc. It pays no attention to existing requests on the peer or
// other peers. Those are handled later.
-func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) {
+func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
t := p.t
if !t.haveInfo() {
return
panic(since)
}
}
- p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
+ if p.t.cl.config.Debug {
+ p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
+ }
pprof.Do(
context.Background(),
pprof.Labels("update request", string(p.needRequestUpdate)),
// Transmit/action the request state to the peer. This includes work-stealing from other peers and
// some piece order randomization within the preferred state calculated earlier in next.
-func (p *PeerConn) applyRequestState(next desiredRequestState) {
+func (p *Peer) applyRequestState(next desiredRequestState) {
current := &p.requestState
// Make interest sticky
if !next.Interested && p.requestState.Interested {
break
}
numPending := maxRequests(current.Requests.GetCardinality() + current.Cancelled.GetCardinality())
- if numPending >= p.nominalMaxRequests() {
+ if numPending >= p.peerImpl.nominalMaxRequests() {
break
}
req := heap.Pop(requestHeap)
panic("changed")
}
- // don't add requests on reciept of a reject - because this causes request back
- // to potentially permanently unresponive peers - which just adds network noise. If
+ // don't add requests on receipt of a reject - because this causes request back
+ // to potentially permanently unresponsive peers - which just adds network noise. If
// the peer can handle more requests it will send an "unchoked" message - which
// will cause it to get added back to the request queue
if p.needRequestUpdate == peerUpdateRequestsRemoteRejectReason {
}
}
+// The whole-torrent first byte position.
+func (t *Torrent) requestIndexBegin(r RequestIndex) int64 {
+ return t.requestOffset(t.requestIndexToRequest(r))
+}
+
+func (t *Torrent) requestIndexEnd(r RequestIndex) int64 {
+ req := t.requestIndexToRequest(r)
+ return t.requestOffset(req) + int64(req.Length)
+}
+
func (t *Torrent) requestOffset(r Request) int64 {
return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
}
func (t *Torrent) slogger() *slog.Logger {
return t._slogger
}
+
+// Get a chunk buffer from the pool. It should be returned when it's no longer in use. Do we
+// waste an allocation if we throw away the pointer it was stored with?
+func (t *Torrent) getChunkBuffer() []byte {
+ return *t.chunkPool.Get().(*[]byte)
+}
+
+func (t *Torrent) putChunkBuffer(b []byte) {
+ panicif.NotEq(cap(b), t.chunkSize.Int())
+ // Does this allocate? Are we amortizing against the cost of a large buffer?
+ t.chunkPool.Put(&b)
+}
+
+func (t *Torrent) withSlogger(base *slog.Logger) *slog.Logger {
+ return base.With(slog.Group(
+ "torrent",
+ "name", lazyLogValuer(func() any {
+ opt := t.bestName()
+ if opt.Ok {
+ return opt.Value
+ }
+ return nil
+ }),
+ "ih", *t.canonicalShortInfohash()))
+}
"context"
"errors"
"fmt"
+ "io"
"iter"
- "math/rand"
+ "log/slog"
+ "slices"
"sync"
"time"
"github.com/RoaringBitmap/roaring"
- "github.com/anacrolix/log"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/webseed"
)
-const (
- webseedPeerCloseOnUnhandledError = false
-)
-
type webseedPeer struct {
// First field for stats alignment.
peer Peer
client webseed.Client
- activeRequests map[Request]webseed.Request
+ activeRequests map[*webseedRequest]struct{}
locker sync.Locker
lastUnhandledErr time.Time
}
+func (me *webseedPeer) nominalMaxRequests() maxRequests {
+ // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
+ // we just allow 2 MiB worth of requests.
+ return intCeilDiv(2<<20, me.peer.t.chunkSize.Int())
+}
+
func (me *webseedPeer) acksCancels() bool {
return false
}
func (me *webseedPeer) updateRequests() {
- //TODO implement me
- panic("implement me")
+ p := &me.peer
+ next := p.getDesiredRequestState()
+ p.applyRequestState(next)
+ p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
+ // Run this after all requests applied to the peer, so they can be batched up.
+ me.spawnRequests()
}
func (me *webseedPeer) lastWriteUploadRate() float64 {
return true
}
-func (ws *webseedPeer) _cancel(r RequestIndex) bool {
- if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
- active.Cancel()
- // The requester is running and will handle the result.
- return true
+func (ws *webseedPeer) handleCancel(r RequestIndex) {
+ for wr := range ws.activeRequestsForIndex(r) {
+ wr.request.Cancel()
}
- // There should be no requester handling this, so no further events will occur.
- return false
}
-func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
- return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
+func (ws *webseedPeer) activeRequestsForIndex(r RequestIndex) iter.Seq[*webseedRequest] {
+ return func(yield func(*webseedRequest) bool) {
+ for wr := range ws.activeRequests {
+ if r < wr.next || r >= wr.end {
+ continue
+ }
+ if !yield(wr) {
+ return
+ }
+ }
+ }
+}
+
+func (ws *webseedPeer) requestIndexTorrentOffset(r RequestIndex) int64 {
+ return ws.peer.t.requestIndexBegin(r)
+}
+
+func (ws *webseedPeer) intoSpec(begin, end RequestIndex) webseed.RequestSpec {
+ t := ws.peer.t
+ start := t.requestIndexBegin(begin)
+ endOff := t.requestIndexEnd(end - 1)
+ return webseed.RequestSpec{start, endOff - start}
}
func (ws *webseedPeer) _request(r Request) bool {
- ws.spawnRequests()
return true
}
-func (ws *webseedPeer) spawnReq(r Request) {
- webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r))
- ws.activeRequests[r] = webseedRequest
- go ws.doRequest(r, webseedRequest)
+func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) {
+ extWsReq := ws.client.StartNewRequest(ws.intoSpec(begin, end))
+ wsReq := webseedRequest{
+ request: extWsReq,
+ begin: begin,
+ next: begin,
+ end: end,
+ }
+ ws.activeRequests[&wsReq] = struct{}{}
+ ws.peer.logger.Slogger().Debug(
+ "starting webseed request",
+ "begin", begin,
+ "end", end,
+ "len", end-begin,
+ "avail", ws.peer.requestState.Requests.GetCardinality())
+ go ws.runRequest(&wsReq)
}
-func (ws *webseedPeer) doRequest(r Request, webseedRequest webseed.Request) {
+func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) {
locker := ws.locker
- err := ws.requestResultHandler(r, webseedRequest)
+ err := ws.readChunks(webseedRequest)
+ // Ensure the body reader and response are closed.
+ webseedRequest.request.Cancel()
+ locker.Lock()
if err != nil {
- level := log.Warning
+ level := slog.LevelWarn
if errors.Is(err, context.Canceled) {
- level = log.Debug
- }
- ws.peer.logger.Levelf(level, "error doing webseed request %v: %v", r, err)
- // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
- // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around
- // it doesn't hurt to slow a few down if there are issues.
- select {
- case <-ws.peer.closed.Done():
- case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
+ level = slog.LevelDebug
+ } else {
+ panic(err)
}
+ ws.slogger().Log(context.TODO(), level, "webseed request error", "err", err)
+ // // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
+ // // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around
+ // // it doesn't hurt to slow a few down if there are issues.
+ // select {
+ // case <-ws.peer.closed.Done():
+ // case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
+ // }
}
- locker.Lock()
+ //locker.Lock()
// Delete this entry after waiting above on an error, to prevent more requests.
- delete(ws.activeRequests, r)
+ delete(ws.activeRequests, webseedRequest)
if err != nil {
ws.peer.onNeedUpdateRequests("webseedPeer request errored")
}
if !ok {
break
}
- ws.spawnReq(req)
+ end := seqLast(ws.iterConsecutiveInactiveRequests(req)).Unwrap()
+ ws.spawnRequest(req, end+1)
+ }
+}
+
+// Returns Some of the last item in a iter.Seq, or None if the sequence is empty.
+func seqLast[V any](seq iter.Seq[V]) (last g.Option[V]) {
+ for item := range seq {
+ last.Set(item)
+ }
+ return
+}
+
+func (ws *webseedPeer) iterConsecutiveRequests(begin RequestIndex) iter.Seq[RequestIndex] {
+ return func(yield func(RequestIndex) bool) {
+ for {
+ if !ws.peer.requestState.Requests.Contains(begin) {
+ return
+ }
+ if !yield(begin) {
+ return
+ }
+ begin++
+ }
}
}
-func (ws *webseedPeer) inactiveRequests() iter.Seq[Request] {
- return func(yield func(Request) bool) {
+func (ws *webseedPeer) iterConsecutiveInactiveRequests(begin RequestIndex) iter.Seq[RequestIndex] {
+ return func(yield func(RequestIndex) bool) {
+ for req := range ws.iterConsecutiveRequests(begin) {
+ if !ws.inactiveRequestIndex(req) {
+ return
+ }
+ if !yield(req) {
+ return
+ }
+ }
+ }
+}
+
+func (ws *webseedPeer) inactiveRequestIndex(index RequestIndex) bool {
+ for range ws.activeRequestsForIndex(index) {
+ return false
+ }
+ return true
+}
+
+func (ws *webseedPeer) inactiveRequests() iter.Seq[RequestIndex] {
+ return func(yield func(RequestIndex) bool) {
+ sorted := slices.Sorted(ws.peer.requestState.Requests.Iterator())
+ if len(sorted) != 0 {
+ fmt.Println("inactiveRequests", sorted)
+ }
for reqIndex := range ws.peer.requestState.Requests.Iterator() {
- r := ws.peer.t.requestIndexToRequest(reqIndex)
- _, ok := ws.activeRequests[r]
- if !ok {
- if !yield(r) {
- return
- }
+ if !ws.inactiveRequestIndex(reqIndex) {
+ continue
+ }
+ if !yield(reqIndex) {
+ return
}
}
}
}
func (ws *webseedPeer) onClose() {
- ws.peer.logger.Levelf(log.Debug, "closing")
// Just deleting them means we would have to manually cancel active requests.
ws.peer.cancelAllRequests()
ws.peer.t.iterPeers(func(p *Peer) {
})
}
-func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
- result := <-webseedRequest.Result
- close(webseedRequest.Result) // one-shot
- // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
- // sure if we can divine which errors indicate cancellation on our end without hitting the
- // network though.
- if len(result.Bytes) != 0 || result.Err == nil {
- // Increment ChunksRead and friends
- ws.peer.doChunkReadStats(int64(len(result.Bytes)))
- }
- ws.peer.readBytes(int64(len(result.Bytes)))
- ws.peer.t.cl.lock()
- defer ws.peer.t.cl.unlock()
- if ws.peer.t.closed.IsSet() {
- return nil
- }
- err := result.Err
- if err != nil {
- switch {
- case errors.Is(err, context.Canceled):
- case errors.Is(err, webseed.ErrTooFast):
- case ws.peer.closed.IsSet():
- default:
- ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
- // // Here lies my attempt to extract something concrete from Go's error system. RIP.
- // cfg := spew.NewDefaultConfig()
- // cfg.DisableMethods = true
- // cfg.Dump(result.Err)
-
- if webseedPeerCloseOnUnhandledError {
- log.Printf("closing %v", ws)
- ws.peer.close()
- } else {
- ws.lastUnhandledErr = time.Now()
- }
+func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) {
+ t := ws.peer.t
+ buf := t.getChunkBuffer()
+ defer t.putChunkBuffer(buf)
+ for ; wr.next < wr.end; wr.next++ {
+ reqSpec := t.requestIndexToRequest(wr.next)
+ chunkLen := reqSpec.Length.Int()
+ buf = buf[:chunkLen]
+ var n int
+ n, err = io.ReadFull(wr.request.Body, buf)
+ ws.peer.readBytes(int64(n))
+ if err != nil {
+ err = fmt.Errorf("reading chunk: %w", err)
+ return
}
- if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
- panic("invalid reject")
+ ws.peer.doChunkReadStats(int64(chunkLen))
+ ws.peer.locker().Lock()
+ err = ws.peer.receiveChunk(&pp.Message{
+ Type: pp.Piece,
+ Piece: buf,
+ Index: reqSpec.Index,
+ Begin: reqSpec.Begin,
+ })
+ ws.peer.locker().Unlock()
+ if err != nil {
+ err = fmt.Errorf("processing chunk: %w", err)
+ return
}
- return err
}
- err = ws.peer.receiveChunk(&pp.Message{
- Type: pp.Piece,
- Index: r.Index,
- Begin: r.Begin,
- Piece: result.Bytes,
- })
- if err != nil {
- panic(err)
- }
- return err
+ return
}
+//
+//func (ws *webseedPeer) requestResultHandler(wr *webseedRequest) (err error) {
+// err = ws.readChunks(wr)
+// switch {
+// case err == nil:
+// case ws.peer.closed.IsSet():
+// case errors.Is(err, context.Canceled):
+// case errors.Is(err, webseed.ErrTooFast):
+// default:
+//
+// }
+// ws.peer.t.cl.lock()
+// defer ws.peer.t.cl.unlock()
+// if ws.peer.t.closed.IsSet() {
+// return nil
+// }
+// if err != nil {
+// switch {
+// case errors.Is(err, context.Canceled):
+// case errors.Is(err, webseed.ErrTooFast):
+// case ws.peer.closed.IsSet():
+// default:
+// ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
+// // // Here lies my attempt to extract something concrete from Go's error system. RIP.
+// // cfg := spew.NewDefaultConfig()
+// // cfg.DisableMethods = true
+// // cfg.Dump(result.Err)
+//
+// if webseedPeerCloseOnUnhandledError {
+// log.Printf("closing %v", ws)
+// ws.peer.close()
+// } else {
+// ws.lastUnhandledErr = time.Now()
+// }
+// }
+// if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
+// panic("invalid reject")
+// }
+// return err
+// }
+// return err
+//}
+
func (me *webseedPeer) peerPieces() *roaring.Bitmap {
return &me.client.Pieces
}
}
return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true
}
+
+func (me *webseedPeer) slogger() *slog.Logger {
+ return me.peer.logger.Slogger()
+}
package torrent
+
+import (
+ "github.com/anacrolix/torrent/webseed"
+)
+
+// A wrapper around webseed.Request with extra state for webseedPeer.
+type webseedRequest struct {
+ request webseed.Request
+ // First assigned in the range.
+ begin RequestIndex
+ // The next to be read.
+ next RequestIndex
+ // One greater than the end of the range.
+ end RequestIndex
+}
package webseed
import (
- "bytes"
"context"
"errors"
"fmt"
"strings"
"github.com/RoaringBitmap/roaring"
+ "github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/segments"
type Request struct {
cancel func()
- Result chan RequestResult
+ // Closed in the machinery when cancelled?
+ Body io.Reader
+ err chan error
}
func (r Request) Cancel() {
// given that's how requests are mapped to webseeds, but the torrent.Client works at the piece
// level. We can map our file-level adjustments to the pieces here. This probably need to be
// private in the future, if Client ever starts removing pieces.
- Pieces roaring.Bitmap
+ Pieces roaring.Bitmap
+ // This wraps http.Response bodies, for example to limit the download rate.
ResponseBodyWrapper ResponseBodyWrapper
PathEscaper PathEscaper
}
}) {
panic("request out of file bounds")
}
+ body, w := io.Pipe()
req := Request{
cancel: cancel,
- Result: make(chan RequestResult, 1),
+ Body: body,
}
go func() {
- b, err := readRequestPartResponses(ctx, requestParts)
- req.Result <- RequestResult{
- Bytes: b,
- Err: err,
- }
+ err := readRequestPartResponses(ctx, w, requestParts)
+ panicif.Err(w.CloseWithError(err))
}()
return req
}
return me.Msg
}
-func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *http.Response) error {
+// Reads the part in full. All expected bytes must be returned or there will an error returned.
+func recvPartResult(ctx context.Context, w io.Writer, part requestPart, resp *http.Response) error {
defer resp.Body.Close()
var body io.Reader = resp.Body
if part.responseBodyWrapper != nil {
}
switch resp.StatusCode {
case http.StatusPartialContent:
- copied, err := io.Copy(buf, body)
+ copied, err := io.Copy(w, body)
if err != nil {
return err
}
// https://archive.org/download/BloodyPitOfHorror/BloodyPitOfHorror.asr.srt. It seems that
// archive.org might be using a webserver implementation that refuses to do partial
// responses to small files.
- if part.e.Start < 48<<10 {
- if part.e.Start != 0 {
- log.Printf("resp status ok but requested range [url=%q, range=%q]",
- part.req.URL,
- part.req.Header.Get("Range"))
- }
- // Instead of discarding, we could try receiving all the chunks present in the response
- // body. I don't know how one would handle multiple chunk requests resulting in an OK
- // response for the same file. The request algorithm might be need to be smarter for
- // that.
- discarded, _ := io.CopyN(io.Discard, body, part.e.Start)
- if discarded != 0 {
- log.Printf("discarded %v bytes in webseed request response part", discarded)
- }
- _, err := io.CopyN(buf, body, part.e.Length)
- return err
- } else {
+ discard := part.e.Start
+ if discard > 48<<10 {
return ErrBadResponse{"resp status ok but requested range", resp}
}
+ if discard != 0 {
+ log.Printf("resp status ok but requested range [url=%q, range=%q]",
+ part.req.URL,
+ part.req.Header.Get("Range"))
+ }
+ // Instead of discarding, we could try receiving all the chunks present in the response
+ // body. I don't know how one would handle multiple chunk requests resulting in an OK
+ // response for the same file. The request algorithm might be need to be smarter for that.
+ discarded, err := io.CopyN(io.Discard, body, discard)
+ if err != nil {
+ return fmt.Errorf("error discarding bytes from http ok response: %w", err)
+ }
+ panicif.NotEq(discarded, discard)
+ // Because the reply is not a partial aware response, we limit the body reader
+ // intentionally.
+ _, err = io.CopyN(w, body, part.e.Length)
+ return err
case http.StatusServiceUnavailable:
return ErrTooFast
default:
+ // TODO: Could we have a slog.Valuer or something to allow callers to unpack reasonable values?
return ErrBadResponse{
- fmt.Sprintf("unhandled response status code (%v)", resp.StatusCode),
+ fmt.Sprintf("unhandled response status code (%v)", resp.Status),
resp,
}
}
var ErrTooFast = errors.New("making requests too fast")
-func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
- var buf bytes.Buffer
+func readRequestPartResponses(ctx context.Context, w io.Writer, parts []requestPart) (err error) {
for _, part := range parts {
var resp *http.Response
resp, err = part.do()
-
if err == nil {
- err = recvPartResult(ctx, &buf, part, resp)
+ err = recvPartResult(ctx, w, part, resp)
}
-
if err != nil {
err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
break
}
}
- return buf.Bytes(), err
+ return
}