From: Matt Joiner Date: Tue, 5 Aug 2025 12:42:59 +0000 (+1000) Subject: Propagate better webseed cancellation errors and cancel requests on peer context X-Git-Tag: v1.59.0~2^2~54 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=02bf07949520a77012a3dffc0621c3760322dcdc;p=btrtrc.git Propagate better webseed cancellation errors and cancel requests on peer context --- diff --git a/errors.go b/errors.go index 10cbafc7..06430139 100644 --- a/errors.go +++ b/errors.go @@ -1 +1,8 @@ package torrent + +// I don't trust errors.New with allocations, and I know I can use unique.Handle if I get desperate. +type stringError string + +func (e stringError) Error() string { + return string(e) +} diff --git a/webseed-peer.go b/webseed-peer.go index 230df90a..5dcef078 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -134,7 +134,7 @@ func (ws *webseedPeer) intoSpec(begin, end RequestIndex) webseed.RequestSpec { } func (ws *webseedPeer) spawnRequest(begin, end RequestIndex, logger *slog.Logger) { - extWsReq := ws.client.StartNewRequest(ws.intoSpec(begin, end), logger) + extWsReq := ws.client.StartNewRequest(ws.peer.closedCtx, ws.intoSpec(begin, end), logger) wsReq := webseedRequest{ logger: logger, request: extWsReq, @@ -263,7 +263,9 @@ func (ws *webseedPeer) maxChunkDiscard() RequestIndex { return RequestIndex(int(intCeilDiv(webseed.MaxDiscardBytes, ws.peer.t.chunkSize))) } -func (ws *webseedPeer) keepReading(wr *webseedRequest) bool { +func (ws *webseedPeer) wantedChunksInDiscardWindow(wr *webseedRequest) bool { + // Shouldn't call this if request is at the end already. + panicif.GreaterThanOrEqual(wr.next, wr.end) for ri := wr.next; ri < wr.end && ri <= wr.next+ws.maxChunkDiscard(); ri++ { if ws.wantChunk(ri) { return true @@ -306,8 +308,10 @@ func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) { err = ws.peer.receiveChunk(&msg) stop := err != nil || wr.next >= wr.end if !stop { - if !ws.keepReading(wr) { - wr.Cancel("finished or discarded") + if !ws.wantedChunksInDiscardWindow(wr) { + // This cancels the stream, but we don't stop su--reading to make the most of the + // buffered body. + wr.Cancel("no wanted chunks in discard window") } } ws.peer.locker().Unlock() diff --git a/webseed-request.go b/webseed-request.go index c0db9dbe..bf267fa3 100644 --- a/webseed-request.go +++ b/webseed-request.go @@ -27,11 +27,11 @@ func (me *webseedRequest) Close() { } // Record that it was exceptionally cancelled. -func (me *webseedRequest) Cancel(reason string) { - me.request.Cancel() +func (me *webseedRequest) Cancel(cause string) { + me.request.Cancel(stringError(cause)) if !me.cancelled.Swap(true) { if webseed.PrintDebug { - me.logger.Debug("cancelled", "reason", reason) + me.logger.Debug("cancelled", "cause", cause) } } } diff --git a/webseed/client.go b/webseed/client.go index 2076e2c5..c15b146e 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -46,14 +46,14 @@ type requestPart struct { } type Request struct { - cancel func() + cancel context.CancelCauseFunc Body io.Reader // Closed with error to unstick copy routine when context isn't checked. bodyPipe *io.PipeReader } -func (r Request) Cancel() { - r.cancel() +func (r Request) Cancel(cause error) { + r.cancel(cause) } func (r Request) Close() { @@ -106,8 +106,8 @@ func (ws *Client) UrlForFileIndex(fileIndex int) string { return urlForFileIndex(ws.Url, fileIndex, ws.info, ws.PathEscaper) } -func (ws *Client) StartNewRequest(r RequestSpec, debugLogger *slog.Logger) Request { - ctx, cancel := context.WithCancel(context.TODO()) +func (ws *Client) StartNewRequest(ctx context.Context, r RequestSpec, debugLogger *slog.Logger) Request { + ctx, cancel := context.WithCancelCause(ctx) var requestParts []requestPart for i, e := range ws.fileIndex.LocateIter(r) { req, err := newRequest(