From 2a7352aad27c037ec702dfa12760dd8a39f158bd Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 27 Oct 2020 12:59:07 +1100 Subject: [PATCH] Don't block trying to fill entire Reader.Read This conforms more to the contract in io.Reader. It's possible the old behaviour was better in reducing overhead, but that can be iterated on (or added as comments next time). --- fs/torrentfs_test.go | 23 ++++++++++++++++------- reader.go | 36 +++++++++++++++++++----------------- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index cd4dfb11..068985df 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -206,15 +206,24 @@ func TestDownloadOnDemand(t *testing.T) { var attr fuse.Attr node.Attr(netContext.Background(), &attr) size := attr.Size - resp := &fuse.ReadResponse{ - Data: make([]byte, size), - } + data := make([]byte, size) h, err := node.(fusefs.NodeOpener).Open(context.TODO(), nil, nil) require.NoError(t, err) - h.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{ - Size: int(size), - }, resp) - assert.EqualValues(t, testutil.GreetingFileContents, resp.Data) + + // torrent.Reader.Read no longer tries to fill the entire read buffer, so this is a ReadFull for + // fusefs. + var n int + for n < len(data) { + resp := fuse.ReadResponse{Data: data[n:]} + err := h.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{ + Size: int(size) - n, + Offset: int64(n), + }, &resp) + assert.NoError(t, err) + n += len(resp.Data) + } + + assert.EqualValues(t, testutil.GreetingFileContents, data) } func TestIsSubPath(t *testing.T) { diff --git a/reader.go b/reader.go index 8a74873b..1f68fdbe 100644 --- a/reader.go +++ b/reader.go @@ -140,7 +140,9 @@ func (r *reader) Read(b []byte) (n int, err error) { } func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { - // This is set under the Client lock if the Context is canceled. + // This is set under the Client lock if the Context is canceled. I think we coordinate on a + // separate variable so as to avoid false negatives with race conditions due to Contexts being + // synchronized. var ctxErr error if ctx.Done() != nil { ctx, cancel := context.WithCancel(ctx) @@ -158,22 +160,19 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { // other purposes. That seems reasonable, but unusual. r.opMu.Lock() defer r.opMu.Unlock() - for len(b) != 0 { - var n1 int - n1, err = r.readOnceAt(b, r.pos, &ctxErr) - if n1 == 0 { - if err == nil { - panic("expected error") - } - break + n, err = r.readOnceAt(b, r.pos, &ctxErr) + if n == 0 { + if err == nil { + panic("expected error") + } else { + return } - b = b[n1:] - n += n1 - r.mu.Lock() - r.pos += int64(n1) - r.posChanged() - r.mu.Unlock() } + + r.mu.Lock() + r.pos += int64(n) + r.posChanged() + r.mu.Unlock() if r.pos >= r.length { err = io.EOF } else if err == io.EOF { @@ -184,7 +183,7 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { // Wait until some data should be available to read. Tickles the client if it // isn't. Returns how much should be readable without blocking. -func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64, err error) { +func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error, wait bool) (avail int64, err error) { r.t.cl.lock() defer r.t.cl.unlock() for { @@ -200,6 +199,9 @@ func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64, e err = *ctxErr return } + if !wait { + return + } r.waitReadable(pos) } } @@ -218,7 +220,7 @@ func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro } for { var avail int64 - avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr) + avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr, n == 0) if avail == 0 { return } -- 2.48.1