]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Switch to reading consecutive incomplete chunks
authorMatt Joiner <anacrolix@gmail.com>
Mon, 1 Feb 2021 01:22:31 +0000 (12:22 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 1 Feb 2021 23:41:38 +0000 (10:41 +1100)
This fixes a race where a sqlite conn isn't reserved for the read part of a MarkComplete operation after the write has already begun.

storage/piece_resource.go
storage/sqlite/sqlite-storage.go

index 466d157ca088c24dda6d5fa6c2c46b01689bb2b4..43f98da70cd237d75fd55fc07f09db00fa7fa7a1 100644 (file)
@@ -2,6 +2,7 @@ package storage
 
 import (
        "bytes"
+       "fmt"
        "io"
        "path"
        "sort"
@@ -57,8 +58,8 @@ type PieceProvider interface {
        resource.Provider
 }
 
-type ConsecutiveChunkWriter interface {
-       WriteConsecutiveChunks(prefix string, _ io.Writer) (int64, error)
+type ConsecutiveChunkReader interface {
+       ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
 }
 
 type piecePerResourcePiece struct {
@@ -69,18 +70,27 @@ type piecePerResourcePiece struct {
 var _ io.WriterTo = piecePerResourcePiece{}
 
 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
-       if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok {
-               if s.mustIsComplete() {
-                       return ccw.WriteConsecutiveChunks(s.completedInstancePath(), w)
-               } else {
-                       return s.writeConsecutiveIncompleteChunks(ccw, w)
+       if s.mustIsComplete() {
+               r, err := s.completed().Get()
+               if err != nil {
+                       return 0, fmt.Errorf("getting complete instance: %w", err)
                }
+               defer r.Close()
+               return io.Copy(w, r)
+       }
+       if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
+               return s.writeConsecutiveIncompleteChunks(ccr, w)
        }
        return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
 }
 
-func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkWriter, w io.Writer) (int64, error) {
-       return ccw.WriteConsecutiveChunks(s.incompleteDirPath()+"/", w)
+func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
+       r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
+       if err != nil {
+               return 0, err
+       }
+       defer r.Close()
+       return io.Copy(w, r)
 }
 
 // Returns if the piece is complete. Ok should be true, because we are the definitive source of
@@ -107,18 +117,18 @@ type SizedPutter interface {
 
 func (s piecePerResourcePiece) MarkComplete() error {
        incompleteChunks := s.getChunks()
-       r, w := io.Pipe()
-       go func() {
-               var err error
-               if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok {
-                       _, err = s.writeConsecutiveIncompleteChunks(ccw, w)
-               } else {
-                       _, err = io.Copy(w, io.NewSectionReader(incompleteChunks, 0, s.mp.Length()))
+       r, err := func() (io.ReadCloser, error) {
+               if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
+                       return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
                }
-               w.CloseWithError(err)
+               return io.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
        }()
+       if err != nil {
+               return fmt.Errorf("getting incomplete chunks reader: %w", err)
+       }
+       defer r.Close()
        completedInstance := s.completed()
-       err := func() error {
+       err = func() error {
                if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
                        return sp.PutSized(r, s.mp.Length())
                } else {
index 5042be0481f39162c6950767398a6eb1e176dec2..fb2bea4cd0c39d9495931bdbd3557e0b87fe0901 100644 (file)
@@ -267,6 +267,7 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
                NumConns:              opts.NumConns,
                NoConcurrentBlobReads: opts.NoConcurrentBlobReads || opts.Memory,
                BatchWrites:           true,
+               //BatchWrites:           opts.NumConns > 1,
        }, nil
 }
 
@@ -355,34 +356,43 @@ type provider struct {
        closeErr error
 }
 
-var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
+var _ storage.ConsecutiveChunkReader = (*provider)(nil)
 
-func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written int64, err error) {
-       err = p.withConn(func(_ context.Context, conn conn) error {
-               err = io.EOF
-               err = sqlitex.Exec(conn, `
+func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
+       runner, err := p.getReadWithConnRunner()
+       if err != nil {
+               return nil, err
+       }
+       r, w := io.Pipe()
+       go func() {
+               err = runner(func(_ context.Context, conn conn) error {
+                       var written int64
+                       err = sqlitex.Exec(conn, `
                                select
                                        data,
                                        cast(substr(name, ?+1) as integer) as offset
                                from blob
                                where name like ?||'%'
                                order by offset`,
-                       func(stmt *sqlite.Stmt) error {
-                               offset := stmt.ColumnInt64(1)
-                               if offset != written {
-                                       return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
-                               }
-                               r := stmt.ColumnReader(0)
-                               w1, err := io.Copy(w, r)
-                               written += w1
-                               return err
-                       },
-                       len(prefix),
-                       prefix,
-               )
-               return err
-       }, false, 0)
-       return
+                               func(stmt *sqlite.Stmt) error {
+                                       offset := stmt.ColumnInt64(1)
+                                       if offset != written {
+                                               return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
+                                       }
+                                       // TODO: Avoid intermediate buffers here
+                                       r := stmt.ColumnReader(0)
+                                       w1, err := io.Copy(w, r)
+                                       written += w1
+                                       return err
+                               },
+                               len(prefix),
+                               prefix,
+                       )
+                       return err
+               })
+               w.CloseWithError(err)
+       }()
+       return r, nil
 }
 
 func (me *provider) Close() error {
@@ -502,13 +512,27 @@ func (p *provider) withConn(with withConn, write bool, skip int) error {
                p.closeMu.RUnlock()
                return <-done
        } else {
-               conn := p.pool.Get(context.TODO())
-               if conn == nil {
-                       return errors.New("couldn't get pool conn")
+               runner, err := p.getReadWithConnRunner()
+               if err != nil {
+                       return err
                }
+               return runner(with)
+       }
+}
+
+// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
+// function, the runner *must* be used or the conn is leaked.
+func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
+       conn := p.pool.Get(context.TODO())
+       if conn == nil {
+               err = errors.New("couldn't get pool conn")
+               return
+       }
+       with = func(with withConn) error {
                defer p.pool.Put(conn)
-               return runQueryWithLabels(with, getLabels(skip+1), conn)
+               return runQueryWithLabels(with, getLabels(1), conn)
        }
+       return
 }
 
 type withConn func(context.Context, conn) error