From 06e6fe721e7f3174b1f563d9c77194f14078bbb9 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 1 Jul 2025 13:28:33 +1000 Subject: [PATCH] Stop reading webseed responses if chunks aren't wanted --- peer-impl.go | 1 - peer.go | 2 +- requesting.go | 3 ++- torrent.go | 17 ++++++++++++++++- webseed-peer.go | 31 +++++++++++++++++++++---------- webseed-request.go | 9 ++++++++- webseed/client.go | 28 ++++++++++++++++++++++------ 7 files changed, 70 insertions(+), 21 deletions(-) diff --git a/peer-impl.go b/peer-impl.go index 211097e8..98d3aa7c 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -14,7 +14,6 @@ import ( type legacyPeerImpl interface { // Trigger the actual request state to get updated handleOnNeedUpdateRequests() - writeInterested(interested bool) bool // Actually go ahead and modify the pending requests. updateRequests() diff --git a/peer.go b/peer.go index 4ea6eb87..26880bf3 100644 --- a/peer.go +++ b/peer.go @@ -380,7 +380,7 @@ func (cn *Peer) totalExpectingTime() (ret time.Duration) { return } -func (cn *Peer) setInterested(interested bool) bool { +func (cn *PeerConn) setInterested(interested bool) bool { if cn.requestState.Interested == interested { return true } diff --git a/requesting.go b/requesting.go index 33232f14..f226e05a 100644 --- a/requesting.go +++ b/requesting.go @@ -76,7 +76,8 @@ func (p *peerId) GobDecode(b []byte) error { type ( // A request index is a chunk indexed across the entire torrent. It's a single integer and can // be converted to a protocol request. TODO: This should be private. - RequestIndex = requestStrategy.RequestIndex + RequestIndex = requestStrategy.RequestIndex + // This is request index but per-piece. chunkIndexType = requestStrategy.ChunkIndex ) diff --git a/torrent.go b/torrent.go index 37f304df..21b00514 100644 --- a/torrent.go +++ b/torrent.go @@ -1327,7 +1327,11 @@ func (t *Torrent) maybeDropMutuallyCompletePeer( p.drop() } -func (t *Torrent) haveChunk(r Request) (ret bool) { +func (t *Torrent) haveRequestIndexChunk(reqIndex RequestIndex) bool { + return t.haveChunk(t.requestIndexToRequest(reqIndex)) +} + +func (t *Torrent) haveChunk(r Request) bool { if !t.haveInfo() { return false } @@ -3500,3 +3504,14 @@ func (t *Torrent) endRequestIndexForFileIndex(fileIndex int) RequestIndex { end := intCeilDiv(uint64(f.offset)+uint64(f.length), t.chunkSize.Uint64()) return RequestIndex(end) } + +func (t *Torrent) wantReceiveChunk(reqIndex RequestIndex) bool { + pi := t.pieceIndexOfRequestIndex(reqIndex) + if !t.wantPieceIndex(pi) { + return false + } + if t.haveRequestIndexChunk(reqIndex) { + return false + } + return true +} diff --git a/webseed-peer.go b/webseed-peer.go index 0f5a2944..a8f453c9 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -82,15 +82,8 @@ func (ws *webseedPeer) onGotInfo(info *metainfo.Info) { }) } -func (ws *webseedPeer) writeInterested(interested bool) bool { - return true -} - -func (ws *webseedPeer) handleCancel(r RequestIndex) { - for wr := range ws.activeRequestsForIndex(r) { - wr.Cancel() - } -} +// Webseeds check the next request is wanted before reading it. +func (ws *webseedPeer) handleCancel(RequestIndex) {} func (ws *webseedPeer) activeRequestsForIndex(r RequestIndex) iter.Seq[*webseedRequest] { return func(yield func(*webseedRequest) bool) { @@ -268,6 +261,24 @@ func (ws *webseedPeer) onClose() { }) } +// Do we want a chunk, assuming it's valid etc. +func (ws *webseedPeer) wantChunk(ri RequestIndex) bool { + return ws.peer.t.wantReceiveChunk(ri) +} + +func (ws *webseedPeer) maxChunkDiscard() RequestIndex { + return RequestIndex(int(intCeilDiv(webseed.MaxDiscardBytes, ws.peer.t.chunkSize))) +} + +func (ws *webseedPeer) keepReading(wr *webseedRequest) bool { + for ri := wr.next; ri < wr.end && ri < wr.next+ws.maxChunkDiscard(); ri++ { + if ws.wantChunk(ri) { + return true + } + } + return false +} + func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) { t := ws.peer.t buf := t.getChunkBuffer() @@ -275,7 +286,7 @@ func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) { msg := pp.Message{ Type: pp.Piece, } - for wr.next < wr.end { + for ws.keepReading(wr) { reqSpec := t.requestIndexToRequest(wr.next) chunkLen := reqSpec.Length.Int() buf = buf[:chunkLen] diff --git a/webseed-request.go b/webseed-request.go index 7e601d6b..51f76199 100644 --- a/webseed-request.go +++ b/webseed-request.go @@ -1,6 +1,8 @@ package torrent import ( + "fmt" + "github.com/anacrolix/torrent/webseed" ) @@ -23,6 +25,11 @@ func (me *webseedRequest) Close() { // Record that it was exceptionally cancelled. func (me *webseedRequest) Cancel() { - me.cancelled = true me.request.Cancel() + if !me.cancelled { + me.cancelled = true + if webseed.PrintDebug { + fmt.Printf("cancelled webseed request\n") + } + } } diff --git a/webseed/client.go b/webseed/client.go index 52d4bb81..edf1ef6e 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -12,11 +12,21 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/anacrolix/missinggo/v2/panicif" + "github.com/dustin/go-humanize" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/segments" ) +// How many consecutive bytes to allow discarding from responses. This number is based on +// https://archive.org/download/BloodyPitOfHorror/BloodyPitOfHorror.asr.srt. It seems that +// archive.org might be using a webserver implementation that refuses to do partial responses to +// small files. +const MaxDiscardBytes = 48 << 10 + +// Output debug information to stdout. +const PrintDebug = false + type RequestSpec = segments.Extent type requestPart struct { @@ -45,12 +55,14 @@ type Client struct { // Max concurrent requests to a WebSeed for a given torrent. MaxRequests int + // TODO: Share this with Torrent. fileIndex segments.Index info *metainfo.Info // The pieces we can request with the Url. We're more likely to ban/block at the file-level // given that's how requests are mapped to webseeds, but the torrent.Client works at the piece // level. We can map our file-level adjustments to the pieces here. This probably need to be - // private in the future, if Client ever starts removing pieces. + // private in the future, if Client ever starts removing pieces. TODO: This belongs in + // webseedPeer. Pieces roaring.Bitmap // This wraps http.Response bodies, for example to limit the download rate. ResponseBodyWrapper ResponseBodyWrapper @@ -98,6 +110,14 @@ func (ws *Client) StartNewRequest(r RequestSpec) Request { responseBodyWrapper: ws.ResponseBodyWrapper, } part.do = func() (*http.Response, error) { + if PrintDebug { + fmt.Printf( + "doing request for %q (file size %v), Range: %q\n", + req.URL, + humanize.Bytes(uint64(ws.fileIndex.Index(i).Length)), + req.Header.Get("Range"), + ) + } return ws.HttpClient.Do(req) } requestParts = append(requestParts, part) @@ -171,12 +191,8 @@ func (me *Client) recvPartResult(ctx context.Context, w io.Writer, part requestP case http.StatusOK: // The response is from the beginning. me.checkContentLength(resp, part, part.e.End()) - // This number is based on - // https://archive.org/download/BloodyPitOfHorror/BloodyPitOfHorror.asr.srt. It seems that - // archive.org might be using a webserver implementation that refuses to do partial - // responses to small files. discard := part.e.Start - if discard > 48<<10 { + if discard > MaxDiscardBytes { return ErrBadResponse{"resp status ok but requested range", resp} } if discard != 0 { -- 2.51.0