From a4efd622514cb5955a76a9b4aa931f7ae64a7b49 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 16 May 2025 15:19:24 +1000 Subject: [PATCH] Implement multiple chunk reads for webseed --- config.go | 3 +- math.go | 10 ++ misc.go | 20 --- peer-impl.go | 4 +- peer.go | 17 +- peerconn.go | 14 +- request-strategy-impls_test.go | 2 +- requesting.go | 14 +- torrent.go | 35 ++++ webseed-peer.go | 301 ++++++++++++++++++++++----------- webseed-request.go | 15 ++ webseed/client.go | 72 ++++---- 12 files changed, 336 insertions(+), 171 deletions(-) diff --git a/config.go b/config.go index 2955f555..e0727865 100644 --- a/config.go +++ b/config.go @@ -152,7 +152,8 @@ type ClientConfig struct { // How long between writes before sending a keep alive message on a peer connection that we want // to maintain. KeepAliveTimeout time.Duration - // Maximum bytes to buffer per peer connection for peer request data before it is sent. + // Maximum bytes to buffer per peer connection for peer request data before it is sent. This + // must be >= the request chunk size from peers. MaxAllocPeerRequestDataPerConn int // The IP addresses as our peers should see them. May differ from the diff --git a/math.go b/math.go index 10cbafc7..cd733e82 100644 --- a/math.go +++ b/math.go @@ -1 +1,11 @@ package torrent + +import ( + "golang.org/x/exp/constraints" +) + +func intCeilDiv[T constraints.Integer](a, b T) T { + // This still sux for negative numbers due to truncating division. But I don't know that we need + // or ceil division makes sense for negative numbers. + return (a + b - 1) / b +} diff --git a/misc.go b/misc.go index 12a454de..ba4597ed 100644 --- a/misc.go +++ b/misc.go @@ -125,26 +125,6 @@ func connIsIpv6(nc interface { return rip.To4() == nil && rip.To16() != nil } -func maxInt(as ...int) int { - ret := as[0] - for _, a := range as[1:] { - if a > ret { - ret = a - } - } - return ret -} - -func minInt(as ...int) int { - ret := as[0] - for _, a := range as[1:] { - if a < ret { - ret = a - } - } - return ret -} - var unlimited = rate.NewLimiter(rate.Inf, 0) type ( diff --git a/peer-impl.go b/peer-impl.go index d23ad6d8..f3e13077 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -6,7 +6,7 @@ import ( "github.com/anacrolix/torrent/metainfo" ) -// Contains implementation details that differ between peer types, like Webseeds and regular +// Contains implementation details that differ between peer types, like WebSeeds and regular // BitTorrent protocol connections. These methods are embedded in the child types of Peer for legacy // expectations that they exist on the child type. Some methods are underlined to avoid collisions // with legacy PeerConn methods. New methods and calls that are fixed up should be migrated over to @@ -46,4 +46,6 @@ type legacyPeerImpl interface { // Abstract methods implemented by subclasses of Peer. type newHotPeerImpl interface { lastWriteUploadRate() float64 + // How many requests should be assigned to the peer. + nominalMaxRequests() maxRequests } diff --git a/peer.go b/peer.go index 2f4a9ea7..113dcb31 100644 --- a/peer.go +++ b/peer.go @@ -58,7 +58,8 @@ type ( lastChunkSent time.Time // Stuff controlled by the local peer. - needRequestUpdate updateRequestReason + needRequestUpdate updateRequestReason + // TODO: How are pending cancels handled for webseed peers? requestState request_strategy.PeerRequestState updateRequestsTimer *time.Timer lastRequestUpdate time.Time @@ -322,7 +323,7 @@ func (cn *Peer) writeStatus(w io.Writer) { &cn._stats.ChunksWritten, cn.requestState.Requests.GetCardinality(), cn.requestState.Cancelled.GetCardinality(), - cn.nominalMaxRequests(), + cn.peerImpl.nominalMaxRequests(), cn.PeerMaxRequests, len(cn.peerRequests), localClientReqq, @@ -387,7 +388,7 @@ var ( ) // The actual value to use as the maximum outbound requests. -func (cn *Peer) nominalMaxRequests() maxRequests { +func (cn *PeerConn) nominalMaxRequests() maxRequests { // TODO: This should differ for webseeds... return max(1, min(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)) } @@ -455,7 +456,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error { return nil } -func (cn *PeerConn) mustRequest(r RequestIndex) bool { +func (cn *Peer) mustRequest(r RequestIndex) bool { more, err := cn.request(r) if err != nil { panic(err) @@ -463,14 +464,14 @@ func (cn *PeerConn) mustRequest(r RequestIndex) bool { return more } -func (cn *PeerConn) request(r RequestIndex) (more bool, err error) { +func (cn *Peer) request(r RequestIndex) (more bool, err error) { if err := cn.shouldRequest(r); err != nil { panic(err) } if cn.requestState.Requests.Contains(r) { return true, nil } - if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() { + if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.peerImpl.nominalMaxRequests() { return true, errors.New("too many outstanding requests") } cn.requestState.Requests.Add(r) @@ -479,13 +480,13 @@ func (cn *PeerConn) request(r RequestIndex) (more bool, err error) { } cn.validReceiveChunks[r]++ cn.t.requestState[r] = requestState{ - peer: &cn.Peer, + peer: cn, when: time.Now(), } cn.updateExpectingChunks() ppReq := cn.t.requestIndexToRequest(r) for _, f := range cn.callbacks.SentRequest { - f(PeerRequestEvent{&cn.Peer, ppReq}) + f(PeerRequestEvent{cn, ppReq}) } return cn.legacyPeerImpl._request(ppReq), nil } diff --git a/peerconn.go b/peerconn.go index 94ec1f4e..420af5be 100644 --- a/peerconn.go +++ b/peerconn.go @@ -325,8 +325,11 @@ func (me *PeerConn) _request(r Request) bool { }) } -func (me *PeerConn) _cancel(r RequestIndex) bool { +func (me *PeerConn) handleCancel(r RequestIndex) { me.write(makeCancelMessage(me.t.requestIndexToRequest(r))) +} + +func (me *PeerConn) acksCancels() bool { return me.remoteRejectsCancels() } @@ -849,9 +852,8 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.Piece: c.doChunkReadStats(int64(len(msg.Piece))) err = c.receiveChunk(&msg) - if len(msg.Piece) == int(t.chunkSize) { - t.chunkPool.Put(&msg.Piece) - } + t.putChunkBuffer(msg.Piece) + msg.Piece = nil if err != nil { err = fmt.Errorf("receiving chunk: %w", err) } @@ -1446,3 +1448,7 @@ func hashRequestFromMessage(m pp.Message) hashRequest { proofLayers: m.ProofLayers, } } + +func (me *PeerConn) peerPtr() *Peer { + return &me.Peer +} diff --git a/request-strategy-impls_test.go b/request-strategy-impls_test.go index daa52217..3940da0e 100644 --- a/request-strategy-impls_test.go +++ b/request-strategy-impls_test.go @@ -129,7 +129,7 @@ func BenchmarkRequestStrategy(b *testing.B) { tor.cacheNextRequestIndexesForReuse(rs.Requests.requestIndexes) // End of part that should be timed. remainingChunks := (numPieces - completed) * (pieceLength / chunkSize) - c.Assert(rs.Requests.requestIndexes, qt.HasLen, minInt( + c.Assert(rs.Requests.requestIndexes, qt.HasLen, min( remainingChunks, int(cl.config.MaxUnverifiedBytes/chunkSize))) } diff --git a/requesting.go b/requesting.go index 958e11b8..8680665d 100644 --- a/requesting.go +++ b/requesting.go @@ -184,7 +184,7 @@ func (t *Torrent) getRequestablePieces(f requestStrategy.RequestPieceFunc) { // This gets the best-case request state. That means handling pieces limited by capacity, preferring // earlier pieces, low availability etc. It pays no attention to existing requests on the peer or // other peers. Those are handled later. -func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) { +func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { t := p.t if !t.haveInfo() { return @@ -257,7 +257,9 @@ func (p *Peer) maybeUpdateActualRequestState() { panic(since) } } - p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate) + if p.t.cl.config.Debug { + p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate) + } pprof.Do( context.Background(), pprof.Labels("update request", string(p.needRequestUpdate)), @@ -304,7 +306,7 @@ func (p *Peer) allowSendNotInterested() bool { // Transmit/action the request state to the peer. This includes work-stealing from other peers and // some piece order randomization within the preferred state calculated earlier in next. -func (p *PeerConn) applyRequestState(next desiredRequestState) { +func (p *Peer) applyRequestState(next desiredRequestState) { current := &p.requestState // Make interest sticky if !next.Interested && p.requestState.Interested { @@ -330,7 +332,7 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) { break } numPending := maxRequests(current.Requests.GetCardinality() + current.Cancelled.GetCardinality()) - if numPending >= p.nominalMaxRequests() { + if numPending >= p.peerImpl.nominalMaxRequests() { break } req := heap.Pop(requestHeap) @@ -338,8 +340,8 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) { panic("changed") } - // don't add requests on reciept of a reject - because this causes request back - // to potentially permanently unresponive peers - which just adds network noise. If + // don't add requests on receipt of a reject - because this causes request back + // to potentially permanently unresponsive peers - which just adds network noise. If // the peer can handle more requests it will send an "unchoked" message - which // will cause it to get added back to the request queue if p.needRequestUpdate == peerUpdateRequestsRemoteRejectReason { diff --git a/torrent.go b/torrent.go index 5fbec4b3..da5b6955 100644 --- a/torrent.go +++ b/torrent.go @@ -1074,6 +1074,16 @@ func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() { } } +// The whole-torrent first byte position. +func (t *Torrent) requestIndexBegin(r RequestIndex) int64 { + return t.requestOffset(t.requestIndexToRequest(r)) +} + +func (t *Torrent) requestIndexEnd(r RequestIndex) int64 { + req := t.requestIndexToRequest(r) + return t.requestOffset(req) + int64(req.Length) +} + func (t *Torrent) requestOffset(r Request) int64 { return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r) } @@ -3376,3 +3386,28 @@ func (t *Torrent) Complete() chansync.ReadOnlyFlag { func (t *Torrent) slogger() *slog.Logger { return t._slogger } + +// Get a chunk buffer from the pool. It should be returned when it's no longer in use. Do we +// waste an allocation if we throw away the pointer it was stored with? +func (t *Torrent) getChunkBuffer() []byte { + return *t.chunkPool.Get().(*[]byte) +} + +func (t *Torrent) putChunkBuffer(b []byte) { + panicif.NotEq(cap(b), t.chunkSize.Int()) + // Does this allocate? Are we amortizing against the cost of a large buffer? + t.chunkPool.Put(&b) +} + +func (t *Torrent) withSlogger(base *slog.Logger) *slog.Logger { + return base.With(slog.Group( + "torrent", + "name", lazyLogValuer(func() any { + opt := t.bestName() + if opt.Ok { + return opt.Value + } + return nil + }), + "ih", *t.canonicalShortInfohash())) +} diff --git a/webseed-peer.go b/webseed-peer.go index 8804137a..95fc2157 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -4,39 +4,47 @@ import ( "context" "errors" "fmt" + "io" "iter" - "math/rand" + "log/slog" + "slices" "sync" "time" "github.com/RoaringBitmap/roaring" - "github.com/anacrolix/log" + g "github.com/anacrolix/generics" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/webseed" ) -const ( - webseedPeerCloseOnUnhandledError = false -) - type webseedPeer struct { // First field for stats alignment. peer Peer client webseed.Client - activeRequests map[Request]webseed.Request + activeRequests map[*webseedRequest]struct{} locker sync.Locker lastUnhandledErr time.Time } +func (me *webseedPeer) nominalMaxRequests() maxRequests { + // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now + // we just allow 2 MiB worth of requests. + return intCeilDiv(2<<20, me.peer.t.chunkSize.Int()) +} + func (me *webseedPeer) acksCancels() bool { return false } func (me *webseedPeer) updateRequests() { - //TODO implement me - panic("implement me") + p := &me.peer + next := p.getDesiredRequestState() + p.applyRequestState(next) + p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes) + // Run this after all requests applied to the peer, so they can be batched up. + me.spawnRequests() } func (me *webseedPeer) lastWriteUploadRate() float64 { @@ -71,51 +79,83 @@ func (ws *webseedPeer) writeInterested(interested bool) bool { return true } -func (ws *webseedPeer) _cancel(r RequestIndex) bool { - if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok { - active.Cancel() - // The requester is running and will handle the result. - return true +func (ws *webseedPeer) handleCancel(r RequestIndex) { + for wr := range ws.activeRequestsForIndex(r) { + wr.request.Cancel() } - // There should be no requester handling this, so no further events will occur. - return false } -func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { - return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} +func (ws *webseedPeer) activeRequestsForIndex(r RequestIndex) iter.Seq[*webseedRequest] { + return func(yield func(*webseedRequest) bool) { + for wr := range ws.activeRequests { + if r < wr.next || r >= wr.end { + continue + } + if !yield(wr) { + return + } + } + } +} + +func (ws *webseedPeer) requestIndexTorrentOffset(r RequestIndex) int64 { + return ws.peer.t.requestIndexBegin(r) +} + +func (ws *webseedPeer) intoSpec(begin, end RequestIndex) webseed.RequestSpec { + t := ws.peer.t + start := t.requestIndexBegin(begin) + endOff := t.requestIndexEnd(end - 1) + return webseed.RequestSpec{start, endOff - start} } func (ws *webseedPeer) _request(r Request) bool { - ws.spawnRequests() return true } -func (ws *webseedPeer) spawnReq(r Request) { - webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r)) - ws.activeRequests[r] = webseedRequest - go ws.doRequest(r, webseedRequest) +func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) { + extWsReq := ws.client.StartNewRequest(ws.intoSpec(begin, end)) + wsReq := webseedRequest{ + request: extWsReq, + begin: begin, + next: begin, + end: end, + } + ws.activeRequests[&wsReq] = struct{}{} + ws.peer.logger.Slogger().Debug( + "starting webseed request", + "begin", begin, + "end", end, + "len", end-begin, + "avail", ws.peer.requestState.Requests.GetCardinality()) + go ws.runRequest(&wsReq) } -func (ws *webseedPeer) doRequest(r Request, webseedRequest webseed.Request) { +func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) { locker := ws.locker - err := ws.requestResultHandler(r, webseedRequest) + err := ws.readChunks(webseedRequest) + // Ensure the body reader and response are closed. + webseedRequest.request.Cancel() + locker.Lock() if err != nil { - level := log.Warning + level := slog.LevelWarn if errors.Is(err, context.Canceled) { - level = log.Debug - } - ws.peer.logger.Levelf(level, "error doing webseed request %v: %v", r, err) - // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any - // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around - // it doesn't hurt to slow a few down if there are issues. - select { - case <-ws.peer.closed.Done(): - case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))): + level = slog.LevelDebug + } else { + panic(err) } + ws.slogger().Log(context.TODO(), level, "webseed request error", "err", err) + // // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any + // // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around + // // it doesn't hurt to slow a few down if there are issues. + // select { + // case <-ws.peer.closed.Done(): + // case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))): + // } } - locker.Lock() + //locker.Lock() // Delete this entry after waiting above on an error, to prevent more requests. - delete(ws.activeRequests, r) + delete(ws.activeRequests, webseedRequest) if err != nil { ws.peer.onNeedUpdateRequests("webseedPeer request errored") } @@ -131,19 +171,65 @@ func (ws *webseedPeer) spawnRequests() { if !ok { break } - ws.spawnReq(req) + end := seqLast(ws.iterConsecutiveInactiveRequests(req)).Unwrap() + ws.spawnRequest(req, end+1) + } +} + +// Returns Some of the last item in a iter.Seq, or None if the sequence is empty. +func seqLast[V any](seq iter.Seq[V]) (last g.Option[V]) { + for item := range seq { + last.Set(item) + } + return +} + +func (ws *webseedPeer) iterConsecutiveRequests(begin RequestIndex) iter.Seq[RequestIndex] { + return func(yield func(RequestIndex) bool) { + for { + if !ws.peer.requestState.Requests.Contains(begin) { + return + } + if !yield(begin) { + return + } + begin++ + } } } -func (ws *webseedPeer) inactiveRequests() iter.Seq[Request] { - return func(yield func(Request) bool) { +func (ws *webseedPeer) iterConsecutiveInactiveRequests(begin RequestIndex) iter.Seq[RequestIndex] { + return func(yield func(RequestIndex) bool) { + for req := range ws.iterConsecutiveRequests(begin) { + if !ws.inactiveRequestIndex(req) { + return + } + if !yield(req) { + return + } + } + } +} + +func (ws *webseedPeer) inactiveRequestIndex(index RequestIndex) bool { + for range ws.activeRequestsForIndex(index) { + return false + } + return true +} + +func (ws *webseedPeer) inactiveRequests() iter.Seq[RequestIndex] { + return func(yield func(RequestIndex) bool) { + sorted := slices.Sorted(ws.peer.requestState.Requests.Iterator()) + if len(sorted) != 0 { + fmt.Println("inactiveRequests", sorted) + } for reqIndex := range ws.peer.requestState.Requests.Iterator() { - r := ws.peer.t.requestIndexToRequest(reqIndex) - _, ok := ws.activeRequests[r] - if !ok { - if !yield(r) { - return - } + if !ws.inactiveRequestIndex(reqIndex) { + continue + } + if !yield(reqIndex) { + return } } } @@ -171,7 +257,6 @@ func (ws *webseedPeer) handleOnNeedUpdateRequests() { } func (ws *webseedPeer) onClose() { - ws.peer.logger.Levelf(log.Debug, "closing") // Just deleting them means we would have to manually cancel active requests. ws.peer.cancelAllRequests() ws.peer.t.iterPeers(func(p *Peer) { @@ -181,59 +266,81 @@ func (ws *webseedPeer) onClose() { }) } -func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error { - result := <-webseedRequest.Result - close(webseedRequest.Result) // one-shot - // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not - // sure if we can divine which errors indicate cancellation on our end without hitting the - // network though. - if len(result.Bytes) != 0 || result.Err == nil { - // Increment ChunksRead and friends - ws.peer.doChunkReadStats(int64(len(result.Bytes))) - } - ws.peer.readBytes(int64(len(result.Bytes))) - ws.peer.t.cl.lock() - defer ws.peer.t.cl.unlock() - if ws.peer.t.closed.IsSet() { - return nil - } - err := result.Err - if err != nil { - switch { - case errors.Is(err, context.Canceled): - case errors.Is(err, webseed.ErrTooFast): - case ws.peer.closed.IsSet(): - default: - ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) - // // Here lies my attempt to extract something concrete from Go's error system. RIP. - // cfg := spew.NewDefaultConfig() - // cfg.DisableMethods = true - // cfg.Dump(result.Err) - - if webseedPeerCloseOnUnhandledError { - log.Printf("closing %v", ws) - ws.peer.close() - } else { - ws.lastUnhandledErr = time.Now() - } +func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) { + t := ws.peer.t + buf := t.getChunkBuffer() + defer t.putChunkBuffer(buf) + for ; wr.next < wr.end; wr.next++ { + reqSpec := t.requestIndexToRequest(wr.next) + chunkLen := reqSpec.Length.Int() + buf = buf[:chunkLen] + var n int + n, err = io.ReadFull(wr.request.Body, buf) + ws.peer.readBytes(int64(n)) + if err != nil { + err = fmt.Errorf("reading chunk: %w", err) + return } - if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { - panic("invalid reject") + ws.peer.doChunkReadStats(int64(chunkLen)) + ws.peer.locker().Lock() + err = ws.peer.receiveChunk(&pp.Message{ + Type: pp.Piece, + Piece: buf, + Index: reqSpec.Index, + Begin: reqSpec.Begin, + }) + ws.peer.locker().Unlock() + if err != nil { + err = fmt.Errorf("processing chunk: %w", err) + return } - return err } - err = ws.peer.receiveChunk(&pp.Message{ - Type: pp.Piece, - Index: r.Index, - Begin: r.Begin, - Piece: result.Bytes, - }) - if err != nil { - panic(err) - } - return err + return } +// +//func (ws *webseedPeer) requestResultHandler(wr *webseedRequest) (err error) { +// err = ws.readChunks(wr) +// switch { +// case err == nil: +// case ws.peer.closed.IsSet(): +// case errors.Is(err, context.Canceled): +// case errors.Is(err, webseed.ErrTooFast): +// default: +// +// } +// ws.peer.t.cl.lock() +// defer ws.peer.t.cl.unlock() +// if ws.peer.t.closed.IsSet() { +// return nil +// } +// if err != nil { +// switch { +// case errors.Is(err, context.Canceled): +// case errors.Is(err, webseed.ErrTooFast): +// case ws.peer.closed.IsSet(): +// default: +// ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) +// // // Here lies my attempt to extract something concrete from Go's error system. RIP. +// // cfg := spew.NewDefaultConfig() +// // cfg.DisableMethods = true +// // cfg.Dump(result.Err) +// +// if webseedPeerCloseOnUnhandledError { +// log.Printf("closing %v", ws) +// ws.peer.close() +// } else { +// ws.lastUnhandledErr = time.Now() +// } +// } +// if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { +// panic("invalid reject") +// } +// return err +// } +// return err +//} + func (me *webseedPeer) peerPieces() *roaring.Bitmap { return &me.client.Pieces } @@ -244,3 +351,7 @@ func (cn *webseedPeer) peerHasAllPieces() (all, known bool) { } return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true } + +func (me *webseedPeer) slogger() *slog.Logger { + return me.peer.logger.Slogger() +} diff --git a/webseed-request.go b/webseed-request.go index 10cbafc7..0f8440db 100644 --- a/webseed-request.go +++ b/webseed-request.go @@ -1 +1,16 @@ package torrent + +import ( + "github.com/anacrolix/torrent/webseed" +) + +// A wrapper around webseed.Request with extra state for webseedPeer. +type webseedRequest struct { + request webseed.Request + // First assigned in the range. + begin RequestIndex + // The next to be read. + next RequestIndex + // One greater than the end of the range. + end RequestIndex +} diff --git a/webseed/client.go b/webseed/client.go index 9e9a96ee..36f2f569 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -1,7 +1,6 @@ package webseed import ( - "bytes" "context" "errors" "fmt" @@ -11,6 +10,7 @@ import ( "strings" "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/segments" @@ -28,7 +28,9 @@ type requestPart struct { type Request struct { cancel func() - Result chan RequestResult + // Closed in the machinery when cancelled? + Body io.Reader + err chan error } func (r Request) Cancel() { @@ -47,7 +49,8 @@ type Client struct { // given that's how requests are mapped to webseeds, but the torrent.Client works at the piece // level. We can map our file-level adjustments to the pieces here. This probably need to be // private in the future, if Client ever starts removing pieces. - Pieces roaring.Bitmap + Pieces roaring.Bitmap + // This wraps http.Response bodies, for example to limit the download rate. ResponseBodyWrapper ResponseBodyWrapper PathEscaper PathEscaper } @@ -95,16 +98,14 @@ func (ws *Client) StartNewRequest(r RequestSpec) Request { }) { panic("request out of file bounds") } + body, w := io.Pipe() req := Request{ cancel: cancel, - Result: make(chan RequestResult, 1), + Body: body, } go func() { - b, err := readRequestPartResponses(ctx, requestParts) - req.Result <- RequestResult{ - Bytes: b, - Err: err, - } + err := readRequestPartResponses(ctx, w, requestParts) + panicif.Err(w.CloseWithError(err)) }() return req } @@ -118,7 +119,8 @@ func (me ErrBadResponse) Error() string { return me.Msg } -func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *http.Response) error { +// Reads the part in full. All expected bytes must be returned or there will an error returned. +func recvPartResult(ctx context.Context, w io.Writer, part requestPart, resp *http.Response) error { defer resp.Body.Close() var body io.Reader = resp.Body if part.responseBodyWrapper != nil { @@ -131,7 +133,7 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp * } switch resp.StatusCode { case http.StatusPartialContent: - copied, err := io.Copy(buf, body) + copied, err := io.Copy(w, body) if err != nil { return err } @@ -144,30 +146,33 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp * // https://archive.org/download/BloodyPitOfHorror/BloodyPitOfHorror.asr.srt. It seems that // archive.org might be using a webserver implementation that refuses to do partial // responses to small files. - if part.e.Start < 48<<10 { - if part.e.Start != 0 { - log.Printf("resp status ok but requested range [url=%q, range=%q]", - part.req.URL, - part.req.Header.Get("Range")) - } - // Instead of discarding, we could try receiving all the chunks present in the response - // body. I don't know how one would handle multiple chunk requests resulting in an OK - // response for the same file. The request algorithm might be need to be smarter for - // that. - discarded, _ := io.CopyN(io.Discard, body, part.e.Start) - if discarded != 0 { - log.Printf("discarded %v bytes in webseed request response part", discarded) - } - _, err := io.CopyN(buf, body, part.e.Length) - return err - } else { + discard := part.e.Start + if discard > 48<<10 { return ErrBadResponse{"resp status ok but requested range", resp} } + if discard != 0 { + log.Printf("resp status ok but requested range [url=%q, range=%q]", + part.req.URL, + part.req.Header.Get("Range")) + } + // Instead of discarding, we could try receiving all the chunks present in the response + // body. I don't know how one would handle multiple chunk requests resulting in an OK + // response for the same file. The request algorithm might be need to be smarter for that. + discarded, err := io.CopyN(io.Discard, body, discard) + if err != nil { + return fmt.Errorf("error discarding bytes from http ok response: %w", err) + } + panicif.NotEq(discarded, discard) + // Because the reply is not a partial aware response, we limit the body reader + // intentionally. + _, err = io.CopyN(w, body, part.e.Length) + return err case http.StatusServiceUnavailable: return ErrTooFast default: + // TODO: Could we have a slog.Valuer or something to allow callers to unpack reasonable values? return ErrBadResponse{ - fmt.Sprintf("unhandled response status code (%v)", resp.StatusCode), + fmt.Sprintf("unhandled response status code (%v)", resp.Status), resp, } } @@ -175,20 +180,17 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp * var ErrTooFast = errors.New("making requests too fast") -func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) { - var buf bytes.Buffer +func readRequestPartResponses(ctx context.Context, w io.Writer, parts []requestPart) (err error) { for _, part := range parts { var resp *http.Response resp, err = part.do() - if err == nil { - err = recvPartResult(ctx, &buf, part, resp) + err = recvPartResult(ctx, w, part, resp) } - if err != nil { err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err) break } } - return buf.Bytes(), err + return } -- 2.51.0