return
}
return &client{
- prov: prov,
- conn: prov.pool.Get(nil),
+ prov: prov,
+ conn: prov.pool.Get(nil),
+ blobs: make(map[string]*sqlite.Blob),
}, nil
}
type client struct {
- l sync.Mutex
- prov *provider
- conn conn
- blob *sqlite.Blob
+ l sync.Mutex
+ prov *provider
+ conn conn
+ blobs map[string]*sqlite.Blob
}
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
}
func (c *client) Close() error {
- if c.blob != nil {
- c.blob.Close()
+ for _, b := range c.blobs {
+ b.Close()
}
c.prov.pool.Put(c.conn)
return c.prov.Close()
t.c.l.Lock()
defer t.c.l.Unlock()
name := p.Hash().HexString()
- return piece{t.c.conn, name, &t.c.l, p.Length(), &t.c.blob}
+ return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
}
func (t torrent) Close() error {
type piece struct {
conn conn
- name string
l *sync.Mutex
+ name string
+ blobs map[string]*sqlite.Blob
length int64
- blob **sqlite.Blob
-}
-
-func (p2 piece) getBlob() *sqlite.Blob {
- rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
- if err != nil {
- panic(err)
- }
- if *p2.blob != nil {
- err := (*p2.blob).Close()
- if err != nil {
- panic(err)
- }
- *p2.blob = nil
- }
- *p2.blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
- if err != nil {
- panic(err)
- }
- return *p2.blob
}
func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
if changes != 1 {
panic(changes)
}
+ p2.blobWouldExpire()
return nil
}
+func (p2 piece) blobWouldExpire() {
+ blob, ok := p2.blobs[p2.name]
+ if !ok {
+ return
+ }
+ blob.Close()
+ delete(p2.blobs, p2.name)
+}
+
func (p2 piece) MarkNotComplete() error {
+ p2.blobWouldExpire()
return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
}
}
return
}
+
+func (p2 piece) closeBlobIfExists() {
+ if b, ok := p2.blobs[p2.name]; ok {
+ b.Close()
+ delete(p2.blobs, p2.name)
+ }
+}
+
+func (p2 piece) getBlob() *sqlite.Blob {
+ blob, ok := p2.blobs[p2.name]
+ if !ok {
+ rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
+ if err != nil {
+ panic(err)
+ }
+ blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
+ if err != nil {
+ panic(err)
+ }
+ p2.blobs[p2.name] = blob
+ }
+ return blob
+}
"testing"
_ "github.com/anacrolix/envpprof"
+ "github.com/anacrolix/torrent/storage"
test_storage "github.com/anacrolix/torrent/storage/test"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
func BenchmarkMarkComplete(b *testing.B) {
const pieceSize = test_storage.DefaultPieceSize
- const capacity = test_storage.DefaultCapacity
+ const capacity = test_storage.DefaultNumPieces * pieceSize / 2
c := qt.New(b)
- for _, memory := range []bool{false, true} {
- b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
- for _, batchWrites := range []bool{false, true} {
- b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
- dbPath := filepath.Join(b.TempDir(), "storage.db")
- //b.Logf("storage db path: %q", dbPath)
- newPoolOpts := NewPoolOpts{
- Path: dbPath,
- Capacity: 4*pieceSize - 1,
- NoConcurrentBlobReads: false,
- PageSize: 1 << 14,
- Memory: memory,
- }
- provOpts := func(opts *ProviderOpts) {
- opts.BatchWrites = batchWrites
- }
- b.Run("SqlitePieceStorage", func(b *testing.B) {
- ci, err := NewPiecesStorage(NewPiecesStorageOpts{
- NewPoolOpts: newPoolOpts,
- ProvOpts: provOpts,
+ for _, storage := range []struct {
+ name string
+ maker func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser
+ }{
+ {"SqliteDirect", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
+ ci, err := NewDirectStorage(NewDirectStorageOpts{
+ NewPoolOpts: newPoolOpts,
+ ProvOpts: provOpts,
+ })
+ c.Assert(err, qt.IsNil)
+ return ci
+ }},
+ {"SqlitePieceStorage", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
+ ci, err := NewPiecesStorage(NewPiecesStorageOpts{
+ NewPoolOpts: newPoolOpts,
+ ProvOpts: provOpts,
+ })
+ c.Assert(err, qt.IsNil)
+ return ci
+ }},
+ } {
+ b.Run(storage.name, func(b *testing.B) {
+ for _, memory := range []bool{false, true} {
+ b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
+ for _, batchWrites := range []bool{false, true} {
+ b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
+ dbPath := filepath.Join(b.TempDir(), "storage.db")
+ //b.Logf("storage db path: %q", dbPath)
+ newPoolOpts := NewPoolOpts{
+ Path: dbPath,
+ Capacity: capacity,
+ NoConcurrentBlobReads: false,
+ PageSize: 1 << 14,
+ Memory: memory,
+ }
+ provOpts := func(opts *ProviderOpts) {
+ opts.BatchWrites = batchWrites
+ }
+ ci := storage.maker(newPoolOpts, provOpts)
+ defer ci.Close()
+ test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
})
- c.Assert(err, qt.IsNil)
- defer ci.Close()
- test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
- })
- b.Run("SqliteDirect", func(b *testing.B) {
- ci, err := NewDirectStorage(NewDirectStorageOpts{
- NewPoolOpts: newPoolOpts,
- ProvOpts: provOpts,
- })
- c.Assert(err, qt.IsNil)
- defer ci.Close()
- test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
- })
+ }
})
}
})
func BenchmarkPieceMarkComplete(
b *testing.B, ci storage.ClientImpl,
- pieceSize int64, numPieces int, capacity int64,
+ pieceSize int64, numPieces int,
+ // This drives any special handling around capacity that may be configured into the storage
+ // implementation.
+ capacity int64,
) {
const check = true
c := qt.New(b)
}(off)
}
wg.Wait()
+ b.StopTimer()
if capacity == 0 {
pi.MarkNotComplete()
}
+ b.StartTimer()
// This might not apply if users of this benchmark don't cache with the expected capacity.
c.Assert(pi.Completion(), qt.Equals, storage.Completion{Complete: false, Ok: true})
c.Assert(pi.MarkComplete(), qt.IsNil)
c.Assert(pi.Completion(), qt.Equals, storage.Completion{true, true})
if check {
readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data))))
- c.Assert(err, qt.IsNil)
+ c.Check(err, qt.IsNil)
c.Assert(len(readData), qt.Equals, len(data))
c.Assert(bytes.Equal(readData, data), qt.IsTrue)
}