From 636b20b86016c726a7bfd20cd6d87600dee51e7d Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 2 Nov 2020 15:35:07 +1100 Subject: [PATCH] Add write incomplete/consecutive chunks interfaces --- storage/interface.go | 4 ++ storage/piece_resource.go | 23 ++++++++++- storage/sqlite/sqlite-storage.go | 70 ++++++++++++++++++++++++++------ storage/wrappers.go | 10 +++++ torrent.go | 19 +++------ 5 files changed, 97 insertions(+), 29 deletions(-) diff --git a/storage/interface.go b/storage/interface.go index 49314677..6481d415 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -38,6 +38,10 @@ type PieceImpl interface { Completion() Completion } +type IncompletePieceToWriter interface { + WriteIncompleteTo(w io.Writer) error +} + type Completion struct { Complete bool Ok bool diff --git a/storage/piece_resource.go b/storage/piece_resource.go index 86a081f2..7f875320 100644 --- a/storage/piece_resource.go +++ b/storage/piece_resource.go @@ -13,10 +13,10 @@ import ( ) type piecePerResource struct { - p resource.Provider + p PieceProvider } -func NewResourcePieces(p resource.Provider) ClientImpl { +func NewResourcePieces(p PieceProvider) ClientImpl { return &piecePerResource{ p: p, } @@ -41,11 +41,29 @@ func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl { } } +type PieceProvider interface { + resource.Provider +} + +type ConsecutiveChunkWriter interface { + WriteConsecutiveChunks(prefix string, _ io.Writer) error +} + type piecePerResourcePiece struct { mp metainfo.Piece rp resource.Provider } +var _ IncompletePieceToWriter = piecePerResourcePiece{} + +func (s piecePerResourcePiece) WriteIncompleteTo(w io.Writer) error { + if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok { + return ccw.WriteConsecutiveChunks(s.incompleteDirPath()+"/", w) + } + _, err := io.Copy(w, io.NewSectionReader(s.getChunks(), 0, s.mp.Length())) + return err +} + func (s piecePerResourcePiece) Completion() Completion { fi, err := s.completed().Stat() return Completion{ @@ -73,6 +91,7 @@ func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) { if s.Completion().Complete { return s.completed().ReadAt(b, off) } + //panic("unexpected ReadAt of incomplete piece") return s.getChunks().ReadAt(b, off) } diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 4d000353..295daa3f 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -71,10 +71,17 @@ with recursive excess( where usage_with >= (select value from setting where name='capacity') ) select * from excess; -CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin +create trigger if not exists trim_blobs_to_capacity_after_update +after update of data on blob +when length(new.data)>length(old.data) and (select sum(length(cast(data as blob))) from blob)>(select value from setting where name='capacity') +begin delete from blob where rowid in (select blob_rowid from deletable_blob); end; -CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin + +create trigger if not exists trim_blobs_to_capacity_after_insert +after insert on blob +when (select sum(length(cast(data as blob))) from blob)>(select value from setting where name='capacity') +begin delete from blob where rowid in (select blob_rowid from deletable_blob); end; `) @@ -188,7 +195,13 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { } writes := make(chan writeRequest) prov := &provider{pool: pool, writes: writes, opts: opts} - go prov.writer(writes) + runtime.SetFinalizer(prov, func(p *provider) { + // This is done in a finalizer, as it's easier than trying to synchronize on whether the + // channel has been closed. It also means that the provider writer can pass back errors from + // a closed ConnPool. + close(p.writes) + }) + go providerWriter(writes, prov.pool) return prov, nil } @@ -227,8 +240,33 @@ type provider struct { opts ProviderOpts } +var _ storage.ConsecutiveChunkWriter = (*provider)(nil) + +func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (err error) { + p.withConn(func(conn conn) { + err = io.EOF + err = sqlitex.Exec(conn, ` + select + cast(data as blob), + cast(substr(name, ?+1) as integer) as offset + from blob + where name like ?||'%' + order by offset`, + func(stmt *sqlite.Stmt) error { + r := stmt.ColumnReader(0) + //offset := stmt.ColumnInt64(1) + //log.Printf("got %v bytes at offset %v", r.Len(), offset) + _, err := io.Copy(w, r) + return err + }, + len(prefix), + prefix, + ) + }, false) + return +} + func (me *provider) Close() error { - close(me.writes) return me.pool.Close() } @@ -237,7 +275,9 @@ type writeRequest struct { done chan<- struct{} } -func (me *provider) writer(writes <-chan writeRequest) { +// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have +// stronger typing on the writes channel. +func providerWriter(writes <-chan writeRequest, pool ConnPool) { for { first, ok := <-writes if !ok { @@ -258,8 +298,8 @@ func (me *provider) writer(writes <-chan writeRequest) { } var cantFail error func() { - conn := me.pool.Get(context.TODO()) - defer me.pool.Put(conn) + conn := pool.Get(context.TODO()) + defer pool.Put(conn) defer sqlitex.Save(conn)(&cantFail) for _, wr := range buf { wr.query(conn) @@ -271,7 +311,7 @@ func (me *provider) writer(writes <-chan writeRequest) { for _, wr := range buf { close(wr.done) } - log.Printf("batched %v write queries", len(buf)) + //log.Printf("batched %v write queries", len(buf)) } } @@ -284,21 +324,25 @@ type instance struct { p *provider } -func (i instance) withConn(with func(conn conn), write bool) { - if write && i.p.opts.BatchWrites { +func (p *provider) withConn(with func(conn conn), write bool) { + if write && p.opts.BatchWrites { done := make(chan struct{}) - i.p.writes <- writeRequest{ + p.writes <- writeRequest{ query: with, done: done, } <-done } else { - conn := i.p.pool.Get(context.TODO()) - defer i.p.pool.Put(conn) + conn := p.pool.Get(context.TODO()) + defer p.pool.Put(conn) with(conn) } } +func (i instance) withConn(with func(conn conn), write bool) { + i.p.withConn(with, write) +} + func (i instance) getConn() *sqlite.Conn { return i.p.pool.Get(context.TODO()) } diff --git a/storage/wrappers.go b/storage/wrappers.go index e8e136eb..aa6c609b 100644 --- a/storage/wrappers.go +++ b/storage/wrappers.go @@ -38,6 +38,16 @@ type Piece struct { mip metainfo.Piece } +func (p Piece) WriteIncompleteTo(w io.Writer) error { + if i, ok := p.PieceImpl.(IncompletePieceToWriter); ok { + return i.WriteIncompleteTo(w) + } + n := p.mip.Length() + r := io.NewSectionReader(p, 0, n) + _, err := io.CopyN(w, r, n) + return err +} + func (p Piece) WriteAt(b []byte, off int64) (n int, err error) { // Callers should not be writing to completed pieces, but it's too // expensive to be checking this on every single write using uncached diff --git a/torrent.go b/torrent.go index 561d3d60..412fad7d 100644 --- a/torrent.go +++ b/torrent.go @@ -787,27 +787,18 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { return pp.Integer(t.info.PieceLength) } -func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, copyErr error) { +func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) { hash := pieceHash.New() p := t.piece(piece) p.waitNoPendingWrites() - ip := t.info.Piece(int(piece)) - pl := ip.Length() - pieceReader := io.NewSectionReader(t.pieces[piece].Storage(), 0, pl) - var hashSource io.Reader - doCopy := func() { - // Return no error iff pl bytes are copied. - _, copyErr = io.CopyN(hash, hashSource, pl) - } + storagePiece := t.pieces[piece].Storage() const logPieceContents = false if logPieceContents { var examineBuf bytes.Buffer - hashSource = io.TeeReader(pieceReader, &examineBuf) - doCopy() - log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), copyErr) + err = storagePiece.WriteIncompleteTo(io.MultiWriter(hash, &examineBuf)) + log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err) } else { - hashSource = pieceReader - doCopy() + err = storagePiece.WriteIncompleteTo(hash) } missinggo.CopyExact(&ret, hash.Sum(nil)) return -- 2.48.1