]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Do webseed request parts sequentially
authorMatt Joiner <anacrolix@gmail.com>
Thu, 2 Dec 2021 02:47:06 +0000 (13:47 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 6 Dec 2021 04:22:00 +0000 (15:22 +1100)
This means we can treat the number of bytes in the result with enough accuracy to decide if we should count it as a wasted chunk. Also I'm not sure why it was a good idea to do parts of a request in parallel anyway, it could just lead to spikes in outstanding requests to the webseed, rather than sticking to the predictable maxRequests limit.

webseed-peer.go
webseed/client.go

index cea3f9de830078fc7a2a81684c13c0df7dee15bd..4cf27d1388e18825178b1b6d3da87dc5f65b3120 100644 (file)
@@ -125,10 +125,14 @@ func (ws *webseedPeer) onClose() {
 
 func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
        result := <-webseedRequest.Result
+       close(webseedRequest.Result) // one-shot
        // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
        // sure if we can divine which errors indicate cancellation on our end without hitting the
        // network though.
-       ws.peer.doChunkReadStats(int64(len(result.Bytes)))
+       if len(result.Bytes) != 0 || result.Err == nil {
+               // Increment ChunksRead and friends
+               ws.peer.doChunkReadStats(int64(len(result.Bytes)))
+       }
        ws.peer.readBytes(int64(len(result.Bytes)))
        ws.peer.t.cl.lock()
        defer ws.peer.t.cl.unlock()
@@ -138,6 +142,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
        if result.Err != nil {
                if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() {
                        ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
+                       // // Here lies my attempt to extract something concrete from Go's error system. RIP.
                        // cfg := spew.NewDefaultConfig()
                        // cfg.DisableMethods = true
                        // cfg.Dump(result.Err)
index 5dc9d2c46a726f7fd66ec35f544503d9f4a0b3f3..5752575de9065403249437e82bdab4ebb5fcf305 100644 (file)
@@ -26,6 +26,7 @@ type requestPart struct {
        req    *http.Request
        e      segments.Extent
        result chan requestPartResult
+       start  func()
 }
 
 type Request struct {
@@ -79,13 +80,15 @@ func (ws *Client) NewRequest(r RequestSpec) Request {
                        result: make(chan requestPartResult, 1),
                        e:      e,
                }
-               go func() {
-                       resp, err := ws.HttpClient.Do(req)
-                       part.result <- requestPartResult{
-                               resp: resp,
-                               err:  err,
-                       }
-               }()
+               part.start = func() {
+                       go func() {
+                               resp, err := ws.HttpClient.Do(req)
+                               part.result <- requestPartResult{
+                                       resp: resp,
+                                       err:  err,
+                               }
+                       }()
+               }
                requestParts = append(requestParts, part)
                return true
        }) {
@@ -116,6 +119,8 @@ func (me ErrBadResponse) Error() string {
 
 func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error {
        result := <-part.result
+       // Make sure there's no further results coming, it should be a one-shot channel.
+       close(part.result)
        if result.err != nil {
                return result.err
        }
@@ -165,29 +170,15 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error
        }
 }
 
-func readRequestPartResponses(ctx context.Context, parts []requestPart) ([]byte, error) {
-       ctx, cancel := context.WithCancel(ctx)
-       defer cancel()
+func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
        var buf bytes.Buffer
-       firstErr := make(chan error, 1)
-       go func() {
-               for _, part := range parts {
-                       err := recvPartResult(ctx, &buf, part)
-                       if err != nil {
-                               // Ensure no further unnecessary response reads occur.
-                               cancel()
-                               select {
-                               case firstErr <- fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err):
-                               default:
-                               }
-                       }
-               }
-               select {
-               case firstErr <- nil:
-               default:
+       for _, part := range parts {
+               part.start()
+               err = recvPartResult(ctx, &buf, part)
+               if err != nil {
+                       err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
+                       break
                }
-       }()
-       // This can't be merged into the return statement, because buf.Bytes is called first!
-       err := <-firstErr
+       }
        return buf.Bytes(), err
 }