]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Don't block trying to fill entire Reader.Read
authorMatt Joiner <anacrolix@gmail.com>
Tue, 27 Oct 2020 01:59:07 +0000 (12:59 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 27 Oct 2020 01:59:07 +0000 (12:59 +1100)
This conforms more to the contract in io.Reader. It's possible the old behaviour was better in reducing overhead, but that can be iterated on (or added as comments next time).

fs/torrentfs_test.go
reader.go

index cd4dfb1180536c622322eef91558e0041063e371..068985dfc25efd4cfa6cb513d5c141bc99bbedd7 100644 (file)
@@ -206,15 +206,24 @@ func TestDownloadOnDemand(t *testing.T) {
        var attr fuse.Attr
        node.Attr(netContext.Background(), &attr)
        size := attr.Size
-       resp := &fuse.ReadResponse{
-               Data: make([]byte, size),
-       }
+       data := make([]byte, size)
        h, err := node.(fusefs.NodeOpener).Open(context.TODO(), nil, nil)
        require.NoError(t, err)
-       h.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{
-               Size: int(size),
-       }, resp)
-       assert.EqualValues(t, testutil.GreetingFileContents, resp.Data)
+
+       // torrent.Reader.Read no longer tries to fill the entire read buffer, so this is a ReadFull for
+       // fusefs.
+       var n int
+       for n < len(data) {
+               resp := fuse.ReadResponse{Data: data[n:]}
+               err := h.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{
+                       Size:   int(size) - n,
+                       Offset: int64(n),
+               }, &resp)
+               assert.NoError(t, err)
+               n += len(resp.Data)
+       }
+
+       assert.EqualValues(t, testutil.GreetingFileContents, data)
 }
 
 func TestIsSubPath(t *testing.T) {
index 8a74873b8afec4eb77095e47676ed7a077c4a54f..1f68fdbe6c94e658acbc44a5a37048748e47ddf4 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -140,7 +140,9 @@ func (r *reader) Read(b []byte) (n int, err error) {
 }
 
 func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
-       // This is set under the Client lock if the Context is canceled.
+       // This is set under the Client lock if the Context is canceled. I think we coordinate on a
+       // separate variable so as to avoid false negatives with race conditions due to Contexts being
+       // synchronized.
        var ctxErr error
        if ctx.Done() != nil {
                ctx, cancel := context.WithCancel(ctx)
@@ -158,22 +160,19 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
        // other purposes. That seems reasonable, but unusual.
        r.opMu.Lock()
        defer r.opMu.Unlock()
-       for len(b) != 0 {
-               var n1 int
-               n1, err = r.readOnceAt(b, r.pos, &ctxErr)
-               if n1 == 0 {
-                       if err == nil {
-                               panic("expected error")
-                       }
-                       break
+       n, err = r.readOnceAt(b, r.pos, &ctxErr)
+       if n == 0 {
+               if err == nil {
+                       panic("expected error")
+               } else {
+                       return
                }
-               b = b[n1:]
-               n += n1
-               r.mu.Lock()
-               r.pos += int64(n1)
-               r.posChanged()
-               r.mu.Unlock()
        }
+
+       r.mu.Lock()
+       r.pos += int64(n)
+       r.posChanged()
+       r.mu.Unlock()
        if r.pos >= r.length {
                err = io.EOF
        } else if err == io.EOF {
@@ -184,7 +183,7 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
 
 // Wait until some data should be available to read. Tickles the client if it
 // isn't. Returns how much should be readable without blocking.
-func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64, err error) {
+func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error, wait bool) (avail int64, err error) {
        r.t.cl.lock()
        defer r.t.cl.unlock()
        for {
@@ -200,6 +199,9 @@ func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64, e
                        err = *ctxErr
                        return
                }
+               if !wait {
+                       return
+               }
                r.waitReadable(pos)
        }
 }
@@ -218,7 +220,7 @@ func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro
        }
        for {
                var avail int64
-               avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr)
+               avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr, n == 0)
                if avail == 0 {
                        return
                }