]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add write incomplete/consecutive chunks interfaces
authorMatt Joiner <anacrolix@gmail.com>
Mon, 2 Nov 2020 04:35:07 +0000 (15:35 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 2 Nov 2020 04:35:07 +0000 (15:35 +1100)
storage/interface.go
storage/piece_resource.go
storage/sqlite/sqlite-storage.go
storage/wrappers.go
torrent.go

index 49314677bebc8cfc3f193279e703350a10afb7cf..6481d415d2157e49168d02e2b3ceec2f92bd61ac 100644 (file)
@@ -38,6 +38,10 @@ type PieceImpl interface {
        Completion() Completion
 }
 
+type IncompletePieceToWriter interface {
+       WriteIncompleteTo(w io.Writer) error
+}
+
 type Completion struct {
        Complete bool
        Ok       bool
index 86a081f24030d1c8ef3dc77a1c6524aa7c6d3b9c..7f87532042de21bb389c9b881da8a2288dc90bba 100644 (file)
@@ -13,10 +13,10 @@ import (
 )
 
 type piecePerResource struct {
-       p resource.Provider
+       p PieceProvider
 }
 
-func NewResourcePieces(p resource.Provider) ClientImpl {
+func NewResourcePieces(p PieceProvider) ClientImpl {
        return &piecePerResource{
                p: p,
        }
@@ -41,11 +41,29 @@ func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl {
        }
 }
 
+type PieceProvider interface {
+       resource.Provider
+}
+
+type ConsecutiveChunkWriter interface {
+       WriteConsecutiveChunks(prefix string, _ io.Writer) error
+}
+
 type piecePerResourcePiece struct {
        mp metainfo.Piece
        rp resource.Provider
 }
 
+var _ IncompletePieceToWriter = piecePerResourcePiece{}
+
+func (s piecePerResourcePiece) WriteIncompleteTo(w io.Writer) error {
+       if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok {
+               return ccw.WriteConsecutiveChunks(s.incompleteDirPath()+"/", w)
+       }
+       _, err := io.Copy(w, io.NewSectionReader(s.getChunks(), 0, s.mp.Length()))
+       return err
+}
+
 func (s piecePerResourcePiece) Completion() Completion {
        fi, err := s.completed().Stat()
        return Completion{
@@ -73,6 +91,7 @@ func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
        if s.Completion().Complete {
                return s.completed().ReadAt(b, off)
        }
+       //panic("unexpected ReadAt of incomplete piece")
        return s.getChunks().ReadAt(b, off)
 }
 
index 4d0003533be956553ddf41ca2e37ce2372d497c0..295daa3fcb6fd16c7f3b80e6c54dbec3ba5e6ff4 100644 (file)
@@ -71,10 +71,17 @@ with recursive excess(
        where usage_with >= (select value from setting where name='capacity')
 ) select * from excess;
 
-CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin 
+create trigger if not exists trim_blobs_to_capacity_after_update
+after update of data on blob
+when length(new.data)>length(old.data) and (select sum(length(cast(data as blob))) from blob)>(select value from setting where name='capacity')
+begin 
        delete from blob where rowid in (select blob_rowid from deletable_blob);
 end;
-CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
+
+create trigger if not exists trim_blobs_to_capacity_after_insert 
+after insert on blob
+when (select sum(length(cast(data as blob))) from blob)>(select value from setting where name='capacity')
+begin
        delete from blob where rowid in (select blob_rowid from deletable_blob);
 end;
 `)
@@ -188,7 +195,13 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
        }
        writes := make(chan writeRequest)
        prov := &provider{pool: pool, writes: writes, opts: opts}
-       go prov.writer(writes)
+       runtime.SetFinalizer(prov, func(p *provider) {
+               // This is done in a finalizer, as it's easier than trying to synchronize on whether the
+               // channel has been closed. It also means that the provider writer can pass back errors from
+               // a closed ConnPool.
+               close(p.writes)
+       })
+       go providerWriter(writes, prov.pool)
        return prov, nil
 }
 
@@ -227,8 +240,33 @@ type provider struct {
        opts   ProviderOpts
 }
 
+var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
+
+func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (err error) {
+       p.withConn(func(conn conn) {
+               err = io.EOF
+               err = sqlitex.Exec(conn, `
+                               select
+                                       cast(data as blob),
+                                       cast(substr(name, ?+1) as integer) as offset
+                               from blob
+                               where name like ?||'%'
+                               order by offset`,
+                       func(stmt *sqlite.Stmt) error {
+                               r := stmt.ColumnReader(0)
+                               //offset := stmt.ColumnInt64(1)
+                               //log.Printf("got %v bytes at offset %v", r.Len(), offset)
+                               _, err := io.Copy(w, r)
+                               return err
+                       },
+                       len(prefix),
+                       prefix,
+               )
+       }, false)
+       return
+}
+
 func (me *provider) Close() error {
-       close(me.writes)
        return me.pool.Close()
 }
 
@@ -237,7 +275,9 @@ type writeRequest struct {
        done  chan<- struct{}
 }
 
-func (me *provider) writer(writes <-chan writeRequest) {
+// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
+// stronger typing on the writes channel.
+func providerWriter(writes <-chan writeRequest, pool ConnPool) {
        for {
                first, ok := <-writes
                if !ok {
@@ -258,8 +298,8 @@ func (me *provider) writer(writes <-chan writeRequest) {
                }
                var cantFail error
                func() {
-                       conn := me.pool.Get(context.TODO())
-                       defer me.pool.Put(conn)
+                       conn := pool.Get(context.TODO())
+                       defer pool.Put(conn)
                        defer sqlitex.Save(conn)(&cantFail)
                        for _, wr := range buf {
                                wr.query(conn)
@@ -271,7 +311,7 @@ func (me *provider) writer(writes <-chan writeRequest) {
                for _, wr := range buf {
                        close(wr.done)
                }
-               log.Printf("batched %v write queries", len(buf))
+               //log.Printf("batched %v write queries", len(buf))
        }
 }
 
@@ -284,21 +324,25 @@ type instance struct {
        p        *provider
 }
 
-func (i instance) withConn(with func(conn conn), write bool) {
-       if write && i.p.opts.BatchWrites {
+func (p *provider) withConn(with func(conn conn), write bool) {
+       if write && p.opts.BatchWrites {
                done := make(chan struct{})
-               i.p.writes <- writeRequest{
+               p.writes <- writeRequest{
                        query: with,
                        done:  done,
                }
                <-done
        } else {
-               conn := i.p.pool.Get(context.TODO())
-               defer i.p.pool.Put(conn)
+               conn := p.pool.Get(context.TODO())
+               defer p.pool.Put(conn)
                with(conn)
        }
 }
 
+func (i instance) withConn(with func(conn conn), write bool) {
+       i.p.withConn(with, write)
+}
+
 func (i instance) getConn() *sqlite.Conn {
        return i.p.pool.Get(context.TODO())
 }
index e8e136eba6515572799c6e39d60984bbecfe9730..aa6c609b1d4d2595ad5d2adf7a8411a68052beb5 100644 (file)
@@ -38,6 +38,16 @@ type Piece struct {
        mip metainfo.Piece
 }
 
+func (p Piece) WriteIncompleteTo(w io.Writer) error {
+       if i, ok := p.PieceImpl.(IncompletePieceToWriter); ok {
+               return i.WriteIncompleteTo(w)
+       }
+       n := p.mip.Length()
+       r := io.NewSectionReader(p, 0, n)
+       _, err := io.CopyN(w, r, n)
+       return err
+}
+
 func (p Piece) WriteAt(b []byte, off int64) (n int, err error) {
        // Callers should not be writing to completed pieces, but it's too
        // expensive to be checking this on every single write using uncached
index 561d3d60bbd9ef265af1f748d19f6b5ca1a9dd24..412fad7da44750ecead326aa4de065695f1656b6 100644 (file)
@@ -787,27 +787,18 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
        return pp.Integer(t.info.PieceLength)
 }
 
-func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, copyErr error) {
+func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
        hash := pieceHash.New()
        p := t.piece(piece)
        p.waitNoPendingWrites()
-       ip := t.info.Piece(int(piece))
-       pl := ip.Length()
-       pieceReader := io.NewSectionReader(t.pieces[piece].Storage(), 0, pl)
-       var hashSource io.Reader
-       doCopy := func() {
-               // Return no error iff pl bytes are copied.
-               _, copyErr = io.CopyN(hash, hashSource, pl)
-       }
+       storagePiece := t.pieces[piece].Storage()
        const logPieceContents = false
        if logPieceContents {
                var examineBuf bytes.Buffer
-               hashSource = io.TeeReader(pieceReader, &examineBuf)
-               doCopy()
-               log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), copyErr)
+               err = storagePiece.WriteIncompleteTo(io.MultiWriter(hash, &examineBuf))
+               log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
        } else {
-               hashSource = pieceReader
-               doCopy()
+               err = storagePiece.WriteIncompleteTo(hash)
        }
        missinggo.CopyExact(&ret, hash.Sum(nil))
        return