import (
"errors"
+ "runtime"
"sync"
+ "time"
"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
NewConnOpts
InitDbOpts
InitConnOpts
+ GcBlobs bool
+ CacheBlobs bool
+ BlobFlushInterval time.Duration
}
// A convenience function that creates a connection pool, resource provider, and a pieces storage
if err != nil {
return
}
- return &client{
+ cl := &client{
conn: conn,
blobs: make(map[string]*sqlite.Blob),
- }, nil
+ opts: opts,
+ }
+ if opts.BlobFlushInterval != 0 {
+ cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
+ }
+ return cl, nil
}
type client struct {
- l sync.Mutex
- conn conn
- blobs map[string]*sqlite.Blob
+ l sync.Mutex
+ conn conn
+ blobs map[string]*sqlite.Blob
+ blobFlusher *time.Timer
+ opts NewDirectStorageOpts
+ closed bool
+}
+
+func (c *client) blobFlusherFunc() {
+ c.l.Lock()
+ defer c.l.Unlock()
+ c.flushBlobs()
+ if !c.closed {
+ c.blobFlusher.Reset(c.opts.BlobFlushInterval)
+ }
+}
+
+func (c *client) flushBlobs() {
+ for key, b := range c.blobs {
+ // Need the lock to prevent racing with the GC finalizers.
+ b.Close()
+ delete(c.blobs, key)
+ }
}
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
}
func (c *client) Close() error {
- for _, b := range c.blobs {
- b.Close()
+ c.l.Lock()
+ defer c.l.Unlock()
+ c.flushBlobs()
+ c.closed = true
+ if c.opts.BlobFlushInterval != 0 {
+ c.blobFlusher.Stop()
}
return c.conn.Close()
}
t.c.l.Lock()
defer t.c.l.Unlock()
name := p.Hash().HexString()
- return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
+ return piece{
+ name,
+ p.Length(),
+ t.c,
+ }
}
func (t torrent) Close() error {
}
type piece struct {
- conn conn
- l *sync.Mutex
name string
- blobs map[string]*sqlite.Blob
length int64
+ *client
}
func (p2 piece) doAtIoWithBlob(
) (n int, err error) {
p2.l.Lock()
defer p2.l.Unlock()
- //defer p2.blobWouldExpire()
+ if !p2.opts.CacheBlobs {
+ defer p2.forgetBlob()
+ }
n, err = atIo(p2.getBlob())(p, off)
+ if err == nil {
+ return
+ }
var se sqlite.Error
- if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT {
+ if !errors.As(err, &se) {
return
}
- p2.blobWouldExpire()
+ if se.Code != sqlite.SQLITE_ABORT && !(p2.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
+ return
+ }
+ p2.forgetBlob()
return atIo(p2.getBlob())(p, off)
}
return nil
}
-func (p2 piece) blobWouldExpire() {
+func (p2 piece) forgetBlob() {
blob, ok := p2.blobs[p2.name]
if !ok {
return
if err != nil {
panic(err)
}
+ if p2.opts.GcBlobs {
+ herp := new(byte)
+ runtime.SetFinalizer(herp, func(*byte) {
+ p2.l.Lock()
+ defer p2.l.Unlock()
+ blob.Close()
+ })
+ }
p2.blobs[p2.name] = blob
}
return blob
"path/filepath"
"sync"
"testing"
+ "time"
_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/torrent/storage"
func BenchmarkMarkComplete(b *testing.B) {
const pieceSize = test_storage.DefaultPieceSize
- const capacity = test_storage.DefaultNumPieces * pieceSize / 2
+ const noTriggers = false
+ var capacity int64 = test_storage.DefaultNumPieces * pieceSize / 2
+ if noTriggers {
+ // Since we won't push out old pieces, we have to mark them incomplete manually.
+ capacity = 0
+ }
runBench := func(b *testing.B, ci storage.ClientImpl) {
test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
}
opts.Memory = memory
opts.Path = filepath.Join(b.TempDir(), "storage.db")
opts.Capacity = capacity
+ opts.CacheBlobs = true
+ //opts.GcBlobs = true
+ opts.BlobFlushInterval = time.Second
+ opts.NoTriggers = noTriggers
directBench := func(b *testing.B) {
ci, err := NewDirectStorage(opts)
if errors.Is(err, UnexpectedJournalMode) {
defer ci.Close()
runBench(b, ci)
}
- for _, journalMode := range []string{"", "wal", "off", "delete", "memory"} {
+ for _, journalMode := range []string{"", "wal", "off", "truncate", "delete", "persist", "memory"} {
opts.SetJournalMode = journalMode
b.Run("JournalMode="+journalMode, func(b *testing.B) {
- for _, mmapSize := range []int64{-1, 0, 1 << 24, 1 << 25, 1 << 26} {
+ for _, mmapSize := range []int64{-1, 0, 1 << 23, 1 << 24, 1 << 25} {
if memory && mmapSize >= 0 {
continue
}