func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
result := <-webseedRequest.Result
+ close(webseedRequest.Result) // one-shot
// We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
// sure if we can divine which errors indicate cancellation on our end without hitting the
// network though.
- ws.peer.doChunkReadStats(int64(len(result.Bytes)))
+ if len(result.Bytes) != 0 || result.Err == nil {
+ // Increment ChunksRead and friends
+ ws.peer.doChunkReadStats(int64(len(result.Bytes)))
+ }
ws.peer.readBytes(int64(len(result.Bytes)))
ws.peer.t.cl.lock()
defer ws.peer.t.cl.unlock()
if result.Err != nil {
if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() {
ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
+ // // Here lies my attempt to extract something concrete from Go's error system. RIP.
// cfg := spew.NewDefaultConfig()
// cfg.DisableMethods = true
// cfg.Dump(result.Err)
req *http.Request
e segments.Extent
result chan requestPartResult
+ start func()
}
type Request struct {
result: make(chan requestPartResult, 1),
e: e,
}
- go func() {
- resp, err := ws.HttpClient.Do(req)
- part.result <- requestPartResult{
- resp: resp,
- err: err,
- }
- }()
+ part.start = func() {
+ go func() {
+ resp, err := ws.HttpClient.Do(req)
+ part.result <- requestPartResult{
+ resp: resp,
+ err: err,
+ }
+ }()
+ }
requestParts = append(requestParts, part)
return true
}) {
func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error {
result := <-part.result
+ // Make sure there's no further results coming, it should be a one-shot channel.
+ close(part.result)
if result.err != nil {
return result.err
}
}
}
-func readRequestPartResponses(ctx context.Context, parts []requestPart) ([]byte, error) {
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
+func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
var buf bytes.Buffer
- firstErr := make(chan error, 1)
- go func() {
- for _, part := range parts {
- err := recvPartResult(ctx, &buf, part)
- if err != nil {
- // Ensure no further unnecessary response reads occur.
- cancel()
- select {
- case firstErr <- fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err):
- default:
- }
- }
- }
- select {
- case firstErr <- nil:
- default:
+ for _, part := range parts {
+ part.start()
+ err = recvPartResult(ctx, &buf, part)
+ if err != nil {
+ err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
+ break
}
- }()
- // This can't be merged into the return statement, because buf.Bytes is called first!
- err := <-firstErr
+ }
return buf.Bytes(), err
}