]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Propagate better webseed cancellation errors and cancel requests on peer context
authorMatt Joiner <anacrolix@gmail.com>
Tue, 5 Aug 2025 12:42:59 +0000 (22:42 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 5 Aug 2025 12:42:59 +0000 (22:42 +1000)
errors.go
webseed-peer.go
webseed-request.go
webseed/client.go

index 10cbafc73d7b9237c864a6f3c625bf9c74366b21..064301396cc856a424ad02b330c249720df4b484 100644 (file)
--- a/errors.go
+++ b/errors.go
@@ -1 +1,8 @@
 package torrent
+
+// I don't trust errors.New with allocations, and I know I can use unique.Handle if I get desperate.
+type stringError string
+
+func (e stringError) Error() string {
+       return string(e)
+}
index 230df90a6c27e750d5be25b881013587dc53dd6b..5dcef0786beb8ffa5340829086591f97d2b2133e 100644 (file)
@@ -134,7 +134,7 @@ func (ws *webseedPeer) intoSpec(begin, end RequestIndex) webseed.RequestSpec {
 }
 
 func (ws *webseedPeer) spawnRequest(begin, end RequestIndex, logger *slog.Logger) {
-       extWsReq := ws.client.StartNewRequest(ws.intoSpec(begin, end), logger)
+       extWsReq := ws.client.StartNewRequest(ws.peer.closedCtx, ws.intoSpec(begin, end), logger)
        wsReq := webseedRequest{
                logger:  logger,
                request: extWsReq,
@@ -263,7 +263,9 @@ func (ws *webseedPeer) maxChunkDiscard() RequestIndex {
        return RequestIndex(int(intCeilDiv(webseed.MaxDiscardBytes, ws.peer.t.chunkSize)))
 }
 
-func (ws *webseedPeer) keepReading(wr *webseedRequest) bool {
+func (ws *webseedPeer) wantedChunksInDiscardWindow(wr *webseedRequest) bool {
+       // Shouldn't call this if request is at the end already.
+       panicif.GreaterThanOrEqual(wr.next, wr.end)
        for ri := wr.next; ri < wr.end && ri <= wr.next+ws.maxChunkDiscard(); ri++ {
                if ws.wantChunk(ri) {
                        return true
@@ -306,8 +308,10 @@ func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) {
                err = ws.peer.receiveChunk(&msg)
                stop := err != nil || wr.next >= wr.end
                if !stop {
-                       if !ws.keepReading(wr) {
-                               wr.Cancel("finished or discarded")
+                       if !ws.wantedChunksInDiscardWindow(wr) {
+                               // This cancels the stream, but we don't stop su--reading to make the most of the
+                               // buffered body.
+                               wr.Cancel("no wanted chunks in discard window")
                        }
                }
                ws.peer.locker().Unlock()
index c0db9dbe5041011e00447d9fbf3132bbbbb8f85d..bf267fa3c69058b19f5bfe6a1c993f9f8bc017c6 100644 (file)
@@ -27,11 +27,11 @@ func (me *webseedRequest) Close() {
 }
 
 // Record that it was exceptionally cancelled.
-func (me *webseedRequest) Cancel(reason string) {
-       me.request.Cancel()
+func (me *webseedRequest) Cancel(cause string) {
+       me.request.Cancel(stringError(cause))
        if !me.cancelled.Swap(true) {
                if webseed.PrintDebug {
-                       me.logger.Debug("cancelled", "reason", reason)
+                       me.logger.Debug("cancelled", "cause", cause)
                }
        }
 }
index 2076e2c5f838d92cc4b6f85299c445f754533bb9..c15b146e69bb4542972e2757afbc2014ea6e0f3b 100644 (file)
@@ -46,14 +46,14 @@ type requestPart struct {
 }
 
 type Request struct {
-       cancel func()
+       cancel context.CancelCauseFunc
        Body   io.Reader
        // Closed with error to unstick copy routine when context isn't checked.
        bodyPipe *io.PipeReader
 }
 
-func (r Request) Cancel() {
-       r.cancel()
+func (r Request) Cancel(cause error) {
+       r.cancel(cause)
 }
 
 func (r Request) Close() {
@@ -106,8 +106,8 @@ func (ws *Client) UrlForFileIndex(fileIndex int) string {
        return urlForFileIndex(ws.Url, fileIndex, ws.info, ws.PathEscaper)
 }
 
-func (ws *Client) StartNewRequest(r RequestSpec, debugLogger *slog.Logger) Request {
-       ctx, cancel := context.WithCancel(context.TODO())
+func (ws *Client) StartNewRequest(ctx context.Context, r RequestSpec, debugLogger *slog.Logger) Request {
+       ctx, cancel := context.WithCancelCause(ctx)
        var requestParts []requestPart
        for i, e := range ws.fileIndex.LocateIter(r) {
                req, err := newRequest(