)
type piecePerResource struct {
- p resource.Provider
+ p PieceProvider
}
-func NewResourcePieces(p resource.Provider) ClientImpl {
+func NewResourcePieces(p PieceProvider) ClientImpl {
return &piecePerResource{
p: p,
}
}
}
+type PieceProvider interface {
+ resource.Provider
+}
+
+type ConsecutiveChunkWriter interface {
+ WriteConsecutiveChunks(prefix string, _ io.Writer) error
+}
+
type piecePerResourcePiece struct {
mp metainfo.Piece
rp resource.Provider
}
+var _ IncompletePieceToWriter = piecePerResourcePiece{}
+
+func (s piecePerResourcePiece) WriteIncompleteTo(w io.Writer) error {
+ if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok {
+ return ccw.WriteConsecutiveChunks(s.incompleteDirPath()+"/", w)
+ }
+ _, err := io.Copy(w, io.NewSectionReader(s.getChunks(), 0, s.mp.Length()))
+ return err
+}
+
func (s piecePerResourcePiece) Completion() Completion {
fi, err := s.completed().Stat()
return Completion{
if s.Completion().Complete {
return s.completed().ReadAt(b, off)
}
+ //panic("unexpected ReadAt of incomplete piece")
return s.getChunks().ReadAt(b, off)
}
where usage_with >= (select value from setting where name='capacity')
) select * from excess;
-CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin
+create trigger if not exists trim_blobs_to_capacity_after_update
+after update of data on blob
+when length(new.data)>length(old.data) and (select sum(length(cast(data as blob))) from blob)>(select value from setting where name='capacity')
+begin
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;
-CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
+
+create trigger if not exists trim_blobs_to_capacity_after_insert
+after insert on blob
+when (select sum(length(cast(data as blob))) from blob)>(select value from setting where name='capacity')
+begin
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;
`)
}
writes := make(chan writeRequest)
prov := &provider{pool: pool, writes: writes, opts: opts}
- go prov.writer(writes)
+ runtime.SetFinalizer(prov, func(p *provider) {
+ // This is done in a finalizer, as it's easier than trying to synchronize on whether the
+ // channel has been closed. It also means that the provider writer can pass back errors from
+ // a closed ConnPool.
+ close(p.writes)
+ })
+ go providerWriter(writes, prov.pool)
return prov, nil
}
opts ProviderOpts
}
+var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
+
+func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (err error) {
+ p.withConn(func(conn conn) {
+ err = io.EOF
+ err = sqlitex.Exec(conn, `
+ select
+ cast(data as blob),
+ cast(substr(name, ?+1) as integer) as offset
+ from blob
+ where name like ?||'%'
+ order by offset`,
+ func(stmt *sqlite.Stmt) error {
+ r := stmt.ColumnReader(0)
+ //offset := stmt.ColumnInt64(1)
+ //log.Printf("got %v bytes at offset %v", r.Len(), offset)
+ _, err := io.Copy(w, r)
+ return err
+ },
+ len(prefix),
+ prefix,
+ )
+ }, false)
+ return
+}
+
func (me *provider) Close() error {
- close(me.writes)
return me.pool.Close()
}
done chan<- struct{}
}
-func (me *provider) writer(writes <-chan writeRequest) {
+// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
+// stronger typing on the writes channel.
+func providerWriter(writes <-chan writeRequest, pool ConnPool) {
for {
first, ok := <-writes
if !ok {
}
var cantFail error
func() {
- conn := me.pool.Get(context.TODO())
- defer me.pool.Put(conn)
+ conn := pool.Get(context.TODO())
+ defer pool.Put(conn)
defer sqlitex.Save(conn)(&cantFail)
for _, wr := range buf {
wr.query(conn)
for _, wr := range buf {
close(wr.done)
}
- log.Printf("batched %v write queries", len(buf))
+ //log.Printf("batched %v write queries", len(buf))
}
}
p *provider
}
-func (i instance) withConn(with func(conn conn), write bool) {
- if write && i.p.opts.BatchWrites {
+func (p *provider) withConn(with func(conn conn), write bool) {
+ if write && p.opts.BatchWrites {
done := make(chan struct{})
- i.p.writes <- writeRequest{
+ p.writes <- writeRequest{
query: with,
done: done,
}
<-done
} else {
- conn := i.p.pool.Get(context.TODO())
- defer i.p.pool.Put(conn)
+ conn := p.pool.Get(context.TODO())
+ defer p.pool.Put(conn)
with(conn)
}
}
+func (i instance) withConn(with func(conn conn), write bool) {
+ i.p.withConn(with, write)
+}
+
func (i instance) getConn() *sqlite.Conn {
return i.p.pool.Get(context.TODO())
}
mip metainfo.Piece
}
+func (p Piece) WriteIncompleteTo(w io.Writer) error {
+ if i, ok := p.PieceImpl.(IncompletePieceToWriter); ok {
+ return i.WriteIncompleteTo(w)
+ }
+ n := p.mip.Length()
+ r := io.NewSectionReader(p, 0, n)
+ _, err := io.CopyN(w, r, n)
+ return err
+}
+
func (p Piece) WriteAt(b []byte, off int64) (n int, err error) {
// Callers should not be writing to completed pieces, but it's too
// expensive to be checking this on every single write using uncached
return pp.Integer(t.info.PieceLength)
}
-func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, copyErr error) {
+func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
hash := pieceHash.New()
p := t.piece(piece)
p.waitNoPendingWrites()
- ip := t.info.Piece(int(piece))
- pl := ip.Length()
- pieceReader := io.NewSectionReader(t.pieces[piece].Storage(), 0, pl)
- var hashSource io.Reader
- doCopy := func() {
- // Return no error iff pl bytes are copied.
- _, copyErr = io.CopyN(hash, hashSource, pl)
- }
+ storagePiece := t.pieces[piece].Storage()
const logPieceContents = false
if logPieceContents {
var examineBuf bytes.Buffer
- hashSource = io.TeeReader(pieceReader, &examineBuf)
- doCopy()
- log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), copyErr)
+ err = storagePiece.WriteIncompleteTo(io.MultiWriter(hash, &examineBuf))
+ log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
} else {
- hashSource = pieceReader
- doCopy()
+ err = storagePiece.WriteIncompleteTo(hash)
}
missinggo.CopyExact(&ret, hash.Sum(nil))
return