)
type piecePerResource struct {
- p PieceProvider
+ rp PieceProvider
+ opts ResourcePiecesOpts
+}
+
+type ResourcePiecesOpts struct {
+ LeaveIncompleteChunks bool
+ AllowSizedPuts bool
}
func NewResourcePieces(p PieceProvider) ClientImpl {
+ return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
+}
+
+func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
return &piecePerResource{
- p: p,
+ rp: p,
}
}
func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl {
return piecePerResourcePiece{
- mp: p,
- rp: s.p,
+ mp: p,
+ piecePerResource: s,
}
}
type piecePerResourcePiece struct {
mp metainfo.Piece
- rp resource.Provider
+ piecePerResource
}
var _ io.WriterTo = piecePerResourcePiece{}
}
}
+type SizedPutter interface {
+ PutSized(io.Reader, int64) error
+}
+
func (s piecePerResourcePiece) MarkComplete() error {
incompleteChunks := s.getChunks()
r, w := io.Pipe()
}
w.CloseWithError(err)
}()
- err := s.completed().Put(r)
- if err == nil {
+ completedInstance := s.completed()
+ err := func() error {
+ if sp, ok := completedInstance.(SizedPutter); ok && s.opts.AllowSizedPuts {
+ return sp.PutSized(r, s.mp.Length())
+ } else {
+ return completedInstance.Put(r)
+ }
+ }()
+ if err == nil && !s.opts.LeaveIncompleteChunks {
+ // I think we do this synchronously here since we don't want callers to act on the completed
+ // piece if we're concurrently still deleting chunks. The caller may decide to start
+ // downloading chunks again and won't expect us to delete them. It seems to be much faster
+ // to let the resource provider do this if possible.
var wg sync.WaitGroup
for _, c := range incompleteChunks {
wg.Add(1)
-- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
-- can trim the database file size with partial vacuums without having to do a full vacuum, which
-- locks everything.
+pragma page_size=16384;
pragma auto_vacuum=incremental;
create table if not exists blob (
conns.Close()
return
}
- store := storage.NewResourcePieces(prov)
+ store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{
+ LeaveIncompleteChunks: true,
+ AllowSizedPuts: true,
+ })
return struct {
storage.ClientImpl
io.Closer
if err != nil {
return
}
- writes := make(chan writeRequest, 1<<(20-14))
- prov := &provider{pool: pool, writes: writes, opts: opts}
- runtime.SetFinalizer(prov, func(p *provider) {
- // This is done in a finalizer, as it's easier than trying to synchronize on whether the
- // channel has been closed. It also means that the provider writer can pass back errors from
- // a closed ConnPool.
- close(p.writes)
- })
- go providerWriter(writes, prov.pool)
+ prov := &provider{pool: pool, opts: opts}
+ if opts.BatchWrites {
+ if opts.NumConns < 2 {
+ err = errors.New("batch writes requires more than 1 conn")
+ return
+ }
+ writes := make(chan writeRequest)
+ prov.writes = writes
+ // This is retained for backwards compatibility. It may not be necessary.
+ runtime.SetFinalizer(prov, func(p *provider) {
+ p.Close()
+ })
+ go providerWriter(writes, prov.pool)
+ }
return prov, nil
}
}
type provider struct {
- pool ConnPool
- writes chan<- writeRequest
- opts ProviderOpts
+ pool ConnPool
+ writes chan<- writeRequest
+ opts ProviderOpts
+ closed sync.Once
+ closeErr error
}
var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
}
func (me *provider) Close() error {
- return me.pool.Close()
+ me.closed.Do(func() {
+ if me.writes != nil {
+ close(me.writes)
+ }
+ me.closeErr = me.pool.Close()
+ })
+ return me.closeErr
}
type writeRequest struct {
// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
// stronger typing on the writes channel.
func providerWriter(writes <-chan writeRequest, pool ConnPool) {
+ conn := pool.Get(context.TODO())
+ if conn == nil {
+ return
+ }
+ defer pool.Put(conn)
for {
first, ok := <-writes
if !ok {
var buf []func()
var cantFail error
func() {
- conn := pool.Get(context.TODO())
- if conn == nil {
- return
- }
- defer pool.Put(conn)
defer sqlitex.Save(conn)(&cantFail)
firstErr := first.query(conn)
buf = append(buf, func() { first.done <- firstErr })
return conn.OpenBlob("main", "blob", "data", rowid, write)
}
+func (i instance) PutSized(reader io.Reader, size int64) (err error) {
+ err = i.withConn(func(conn conn) error {
+ err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
+ nil,
+ i.location, size)
+ if err != nil {
+ return err
+ }
+ blob, err := i.openBlob(conn, true, false)
+ if err != nil {
+ return err
+ }
+ defer blob.Close()
+ _, err = io.Copy(blob, reader)
+ return err
+ }, true)
+ return
+}
+
func (i instance) Put(reader io.Reader) (err error) {
var buf bytes.Buffer
_, err = io.Copy(&buf, reader)
if err != nil {
return err
}
- err = i.withConn(func(conn conn) error {
- for range iter.N(10) {
- err = sqlitex.Exec(conn,
- "insert or replace into blob(name, data) values(?, cast(? as blob))",
- nil,
- i.location, buf.Bytes())
- if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
- log.Print("sqlite busy")
- time.Sleep(time.Second)
- continue
+ if false {
+ return i.PutSized(&buf, int64(buf.Len()))
+ } else {
+ return i.withConn(func(conn conn) error {
+ for range iter.N(10) {
+ err = sqlitex.Exec(conn,
+ "insert or replace into blob(name, data) values(?, cast(? as blob))",
+ nil,
+ i.location, buf.Bytes())
+ if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
+ log.Print("sqlite busy")
+ time.Sleep(time.Second)
+ continue
+ }
+ break
}
- break
- }
- return err
- }, true)
- return
+ return err
+ }, true)
+ }
}
type fileInfo struct {
func TestSimultaneousIncrementalBlob(t *testing.T) {
_, p := newConnsAndProv(t, NewPoolOpts{
- NumConns: 2,
+ NumConns: 3,
ConcurrentBlobReads: true,
})
a, err := p.NewInstance("a")
rand.Read(data)
dbPath := filepath.Join(b.TempDir(), "storage.db")
b.Logf("storage db path: %q", dbPath)
- ci, err := NewPiecesStorage(NewPoolOpts{Path: dbPath, Capacity: pieceSize})
+ ci, err := NewPiecesStorage(NewPoolOpts{
+ Path: dbPath,
+ Capacity: pieceSize,
+ ConcurrentBlobReads: true,
+ })
c.Assert(err, qt.IsNil)
defer ci.Close()
ti, err := ci.OpenTorrent(nil, metainfo.Hash{})
Length: pieceSize,
},
})
+ // Do it untimed the first time to prime the cache.
+ storage.BenchmarkPieceMarkComplete(b, pi, data)
b.ResetTimer()
for range iter.N(b.N) {
storage.BenchmarkPieceMarkComplete(b, pi, data)