From 270a2ba1aeb0c14f081b283a4231d2691e448405 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 1 Feb 2021 12:22:31 +1100 Subject: [PATCH] Switch to reading consecutive incomplete chunks This fixes a race where a sqlite conn isn't reserved for the read part of a MarkComplete operation after the write has already begun. --- storage/piece_resource.go | 46 ++++++++++++-------- storage/sqlite/sqlite-storage.go | 74 +++++++++++++++++++++----------- 2 files changed, 77 insertions(+), 43 deletions(-) diff --git a/storage/piece_resource.go b/storage/piece_resource.go index 466d157c..43f98da7 100644 --- a/storage/piece_resource.go +++ b/storage/piece_resource.go @@ -2,6 +2,7 @@ package storage import ( "bytes" + "fmt" "io" "path" "sort" @@ -57,8 +58,8 @@ type PieceProvider interface { resource.Provider } -type ConsecutiveChunkWriter interface { - WriteConsecutiveChunks(prefix string, _ io.Writer) (int64, error) +type ConsecutiveChunkReader interface { + ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) } type piecePerResourcePiece struct { @@ -69,18 +70,27 @@ type piecePerResourcePiece struct { var _ io.WriterTo = piecePerResourcePiece{} func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) { - if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok { - if s.mustIsComplete() { - return ccw.WriteConsecutiveChunks(s.completedInstancePath(), w) - } else { - return s.writeConsecutiveIncompleteChunks(ccw, w) + if s.mustIsComplete() { + r, err := s.completed().Get() + if err != nil { + return 0, fmt.Errorf("getting complete instance: %w", err) } + defer r.Close() + return io.Copy(w, r) + } + if ccr, ok := s.rp.(ConsecutiveChunkReader); ok { + return s.writeConsecutiveIncompleteChunks(ccr, w) } return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length())) } -func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkWriter, w io.Writer) (int64, error) { - return ccw.WriteConsecutiveChunks(s.incompleteDirPath()+"/", w) +func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) { + r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/") + if err != nil { + return 0, err + } + defer r.Close() + return io.Copy(w, r) } // Returns if the piece is complete. Ok should be true, because we are the definitive source of @@ -107,18 +117,18 @@ type SizedPutter interface { func (s piecePerResourcePiece) MarkComplete() error { incompleteChunks := s.getChunks() - r, w := io.Pipe() - go func() { - var err error - if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok { - _, err = s.writeConsecutiveIncompleteChunks(ccw, w) - } else { - _, err = io.Copy(w, io.NewSectionReader(incompleteChunks, 0, s.mp.Length())) + r, err := func() (io.ReadCloser, error) { + if ccr, ok := s.rp.(ConsecutiveChunkReader); ok { + return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/") } - w.CloseWithError(err) + return io.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil }() + if err != nil { + return fmt.Errorf("getting incomplete chunks reader: %w", err) + } + defer r.Close() completedInstance := s.completed() - err := func() error { + err = func() error { if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts { return sp.PutSized(r, s.mp.Length()) } else { diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 5042be04..fb2bea4c 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -267,6 +267,7 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { NumConns: opts.NumConns, NoConcurrentBlobReads: opts.NoConcurrentBlobReads || opts.Memory, BatchWrites: true, + //BatchWrites: opts.NumConns > 1, }, nil } @@ -355,34 +356,43 @@ type provider struct { closeErr error } -var _ storage.ConsecutiveChunkWriter = (*provider)(nil) +var _ storage.ConsecutiveChunkReader = (*provider)(nil) -func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written int64, err error) { - err = p.withConn(func(_ context.Context, conn conn) error { - err = io.EOF - err = sqlitex.Exec(conn, ` +func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) { + runner, err := p.getReadWithConnRunner() + if err != nil { + return nil, err + } + r, w := io.Pipe() + go func() { + err = runner(func(_ context.Context, conn conn) error { + var written int64 + err = sqlitex.Exec(conn, ` select data, cast(substr(name, ?+1) as integer) as offset from blob where name like ?||'%' order by offset`, - func(stmt *sqlite.Stmt) error { - offset := stmt.ColumnInt64(1) - if offset != written { - return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written) - } - r := stmt.ColumnReader(0) - w1, err := io.Copy(w, r) - written += w1 - return err - }, - len(prefix), - prefix, - ) - return err - }, false, 0) - return + func(stmt *sqlite.Stmt) error { + offset := stmt.ColumnInt64(1) + if offset != written { + return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written) + } + // TODO: Avoid intermediate buffers here + r := stmt.ColumnReader(0) + w1, err := io.Copy(w, r) + written += w1 + return err + }, + len(prefix), + prefix, + ) + return err + }) + w.CloseWithError(err) + }() + return r, nil } func (me *provider) Close() error { @@ -502,13 +512,27 @@ func (p *provider) withConn(with withConn, write bool, skip int) error { p.closeMu.RUnlock() return <-done } else { - conn := p.pool.Get(context.TODO()) - if conn == nil { - return errors.New("couldn't get pool conn") + runner, err := p.getReadWithConnRunner() + if err != nil { + return err } + return runner(with) + } +} + +// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this +// function, the runner *must* be used or the conn is leaked. +func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) { + conn := p.pool.Get(context.TODO()) + if conn == nil { + err = errors.New("couldn't get pool conn") + return + } + with = func(with withConn) error { defer p.pool.Put(conn) - return runQueryWithLabels(with, getLabels(skip+1), conn) + return runQueryWithLabels(with, getLabels(1), conn) } + return } type withConn func(context.Context, conn) error -- 2.48.1