import (
"bytes"
+ "fmt"
"io"
"path"
"sort"
resource.Provider
}
-type ConsecutiveChunkWriter interface {
- WriteConsecutiveChunks(prefix string, _ io.Writer) (int64, error)
+type ConsecutiveChunkReader interface {
+ ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
}
type piecePerResourcePiece struct {
var _ io.WriterTo = piecePerResourcePiece{}
func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
- if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok {
- if s.mustIsComplete() {
- return ccw.WriteConsecutiveChunks(s.completedInstancePath(), w)
- } else {
- return s.writeConsecutiveIncompleteChunks(ccw, w)
+ if s.mustIsComplete() {
+ r, err := s.completed().Get()
+ if err != nil {
+ return 0, fmt.Errorf("getting complete instance: %w", err)
}
+ defer r.Close()
+ return io.Copy(w, r)
+ }
+ if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
+ return s.writeConsecutiveIncompleteChunks(ccr, w)
}
return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
}
-func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkWriter, w io.Writer) (int64, error) {
- return ccw.WriteConsecutiveChunks(s.incompleteDirPath()+"/", w)
+func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
+ r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
+ if err != nil {
+ return 0, err
+ }
+ defer r.Close()
+ return io.Copy(w, r)
}
// Returns if the piece is complete. Ok should be true, because we are the definitive source of
func (s piecePerResourcePiece) MarkComplete() error {
incompleteChunks := s.getChunks()
- r, w := io.Pipe()
- go func() {
- var err error
- if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok {
- _, err = s.writeConsecutiveIncompleteChunks(ccw, w)
- } else {
- _, err = io.Copy(w, io.NewSectionReader(incompleteChunks, 0, s.mp.Length()))
+ r, err := func() (io.ReadCloser, error) {
+ if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
+ return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
}
- w.CloseWithError(err)
+ return io.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
}()
+ if err != nil {
+ return fmt.Errorf("getting incomplete chunks reader: %w", err)
+ }
+ defer r.Close()
completedInstance := s.completed()
- err := func() error {
+ err = func() error {
if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
return sp.PutSized(r, s.mp.Length())
} else {
NumConns: opts.NumConns,
NoConcurrentBlobReads: opts.NoConcurrentBlobReads || opts.Memory,
BatchWrites: true,
+ //BatchWrites: opts.NumConns > 1,
}, nil
}
closeErr error
}
-var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
+var _ storage.ConsecutiveChunkReader = (*provider)(nil)
-func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written int64, err error) {
- err = p.withConn(func(_ context.Context, conn conn) error {
- err = io.EOF
- err = sqlitex.Exec(conn, `
+func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
+ runner, err := p.getReadWithConnRunner()
+ if err != nil {
+ return nil, err
+ }
+ r, w := io.Pipe()
+ go func() {
+ err = runner(func(_ context.Context, conn conn) error {
+ var written int64
+ err = sqlitex.Exec(conn, `
select
data,
cast(substr(name, ?+1) as integer) as offset
from blob
where name like ?||'%'
order by offset`,
- func(stmt *sqlite.Stmt) error {
- offset := stmt.ColumnInt64(1)
- if offset != written {
- return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
- }
- r := stmt.ColumnReader(0)
- w1, err := io.Copy(w, r)
- written += w1
- return err
- },
- len(prefix),
- prefix,
- )
- return err
- }, false, 0)
- return
+ func(stmt *sqlite.Stmt) error {
+ offset := stmt.ColumnInt64(1)
+ if offset != written {
+ return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
+ }
+ // TODO: Avoid intermediate buffers here
+ r := stmt.ColumnReader(0)
+ w1, err := io.Copy(w, r)
+ written += w1
+ return err
+ },
+ len(prefix),
+ prefix,
+ )
+ return err
+ })
+ w.CloseWithError(err)
+ }()
+ return r, nil
}
func (me *provider) Close() error {
p.closeMu.RUnlock()
return <-done
} else {
- conn := p.pool.Get(context.TODO())
- if conn == nil {
- return errors.New("couldn't get pool conn")
+ runner, err := p.getReadWithConnRunner()
+ if err != nil {
+ return err
}
+ return runner(with)
+ }
+}
+
+// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
+// function, the runner *must* be used or the conn is leaked.
+func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
+ conn := p.pool.Get(context.TODO())
+ if conn == nil {
+ err = errors.New("couldn't get pool conn")
+ return
+ }
+ with = func(with withConn) error {
defer p.pool.Put(conn)
- return runQueryWithLabels(with, getLabels(skip+1), conn)
+ return runQueryWithLabels(with, getLabels(1), conn)
}
+ return
}
type withConn func(context.Context, conn) error