p metainfo.Piece
}
+var _ storage.PieceImpl = badStoragePiece{}
+
func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
return 0, nil
}
-func (p badStoragePiece) GetIsComplete() bool {
- return true
+func (p badStoragePiece) Completion() storage.Completion {
+ return storage.Completion{Complete: true, Ok: true}
}
func (p badStoragePiece) MarkComplete() error {
return me
}
-func (me *torrentStorage) GetIsComplete() bool {
- return false
+func (me *torrentStorage) Completion() storage.Completion {
+ return storage.Completion{}
}
func (me *torrentStorage) MarkComplete() error {
package torrent
import (
+ "fmt"
+ "log"
"sync"
"github.com/anacrolix/missinggo/bitmap"
// length can be determined by the request chunkSize in use.
dirtyChunks bitmap.Bitmap
- hashing bool
- queuedForHash bool
- everHashed bool
- numVerifies int64
+ hashing bool
+ everHashed bool
+ numVerifies int64
+ storageCompletionOk bool
publicPieceState PieceState
priority piecePriority
noPendingWrites sync.Cond
}
+func (p *Piece) String() string {
+ return fmt.Sprintf("%s/%d", p.t.infoHash.HexString(), p.index)
+}
+
func (p *Piece) Info() metainfo.Piece {
return p.t.info.Piece(p.index)
}
if p.hashing {
target++
}
+ log.Printf("target: %d", target)
p.t.queuePieceCheck(p.index)
for p.numVerifies < target {
+ log.Printf("got %d verifies", p.numVerifies)
p.t.cl.event.Wait()
}
+ log.Print("done")
+}
+
+func (p *Piece) queuedForHash() bool {
+ return p.t.piecesQueuedForHash.Get(p.index)
}
"github.com/anacrolix/torrent/metainfo"
)
+const (
+ boltDbCompleteValue = "c"
+ boltDbIncompleteValue = "i"
+)
+
var (
- value = []byte{}
+ completionBucketKey = []byte("completion")
)
type boltPieceCompletion struct {
db *bolt.DB
}
+var _ PieceCompletion = (*boltPieceCompletion)(nil)
+
func NewBoltPieceCompletion(dir string) (ret PieceCompletion, err error) {
os.MkdirAll(dir, 0770)
p := filepath.Join(dir, ".torrent.bolt.db")
return
}
-func (me *boltPieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) {
+func (me boltPieceCompletion) Get(pk metainfo.PieceKey) (cn Completion, err error) {
err = me.db.View(func(tx *bolt.Tx) error {
- c := tx.Bucket(completed)
- if c == nil {
+ cb := tx.Bucket(completionBucketKey)
+ if cb == nil {
return nil
}
- ih := c.Bucket(pk.InfoHash[:])
+ ih := cb.Bucket(pk.InfoHash[:])
if ih == nil {
return nil
}
var key [4]byte
binary.BigEndian.PutUint32(key[:], uint32(pk.Index))
- ret = ih.Get(key[:]) != nil
+ cn.Ok = true
+ switch string(ih.Get(key[:])) {
+ case boltDbCompleteValue:
+ cn.Complete = true
+ case boltDbIncompleteValue:
+ cn.Complete = false
+ default:
+ cn.Ok = false
+ }
return nil
})
return
}
-func (me *boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
+func (me boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
return me.db.Update(func(tx *bolt.Tx) error {
- c, err := tx.CreateBucketIfNotExists(completed)
+ c, err := tx.CreateBucketIfNotExists(completionBucketKey)
if err != nil {
return err
}
}
var key [4]byte
binary.BigEndian.PutUint32(key[:], uint32(pk.Index))
- if b {
- return ih.Put(key[:], value)
- } else {
- return ih.Delete(key[:])
- }
+ return ih.Put(key[:], []byte(func() string {
+ if b {
+ return boltDbCompleteValue
+ } else {
+ return boltDbIncompleteValue
+ }
+ }()))
})
}
--- /dev/null
+package storage
+
+import (
+ "encoding/binary"
+
+ "github.com/anacrolix/missinggo/x"
+ "github.com/anacrolix/torrent/metainfo"
+ "github.com/boltdb/bolt"
+)
+
+type boltDBPiece struct {
+ db *bolt.DB
+ p metainfo.Piece
+ ih metainfo.Hash
+ key [24]byte
+}
+
+var (
+ _ PieceImpl = (*boltDBPiece)(nil)
+ dataBucketKey = []byte("data")
+)
+
+func (me *boltDBPiece) pc() PieceCompletionGetSetter {
+ return boltPieceCompletion{me.db}
+}
+
+func (me *boltDBPiece) pk() metainfo.PieceKey {
+ return metainfo.PieceKey{me.ih, me.p.Index()}
+}
+
+func (me *boltDBPiece) Completion() Completion {
+ c, err := me.pc().Get(me.pk())
+ x.Pie(err)
+ return c
+}
+
+func (me *boltDBPiece) MarkComplete() error {
+ return me.pc().Set(me.pk(), true)
+}
+
+func (me *boltDBPiece) MarkNotComplete() error {
+ return me.pc().Set(me.pk(), false)
+}
+func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
+ err = me.db.View(func(tx *bolt.Tx) error {
+ db := tx.Bucket(dataBucketKey)
+ if db == nil {
+ return nil
+ }
+ ci := off / chunkSize
+ off %= chunkSize
+ for len(b) != 0 {
+ ck := me.chunkKey(int(ci))
+ _b := db.Get(ck[:])
+ if len(_b) != chunkSize {
+ break
+ }
+ n1 := copy(b, _b[off:])
+ off = 0
+ ci++
+ b = b[n1:]
+ n += n1
+ }
+ return nil
+ })
+ return
+}
+
+func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) {
+ copy(ret[:], me.key[:])
+ binary.BigEndian.PutUint16(ret[24:], uint16(index))
+ return
+}
+
+func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) {
+ err = me.db.Update(func(tx *bolt.Tx) error {
+ db, err := tx.CreateBucketIfNotExists(dataBucketKey)
+ if err != nil {
+ return err
+ }
+ ci := off / chunkSize
+ off %= chunkSize
+ for len(b) != 0 {
+ _b := make([]byte, chunkSize)
+ ck := me.chunkKey(int(ci))
+ copy(_b, db.Get(ck[:]))
+ n1 := copy(_b[off:], b)
+ db.Put(ck[:], _b)
+ if n1 > len(b) {
+ break
+ }
+ b = b[n1:]
+ off = 0
+ ci++
+ n += n1
+ }
+ return nil
+ })
+ return
+}
chunkSize = 1 << 14
)
-var (
- // The key for the data bucket.
- data = []byte("data")
- // The key for the completion flag bucket.
- completed = []byte("completed")
- // The value to assigned to pieces that are complete in the completed
- // bucket.
- completedValue = []byte{1}
-)
-
type boltDBClient struct {
db *bolt.DB
}
ih metainfo.Hash
}
-type boltDBPiece struct {
- db *bolt.DB
- p metainfo.Piece
- key [24]byte
-}
-
func NewBoltDB(filePath string) ClientImpl {
ret := &boltDBClient{}
var err error
}
func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl {
- ret := &boltDBPiece{p: p, db: me.cl.db}
+ ret := &boltDBPiece{
+ p: p,
+ db: me.cl.db,
+ ih: me.ih,
+ }
copy(ret.key[:], me.ih[:])
binary.BigEndian.PutUint32(ret.key[20:], uint32(p.Index()))
return ret
}
func (boltDBTorrent) Close() error { return nil }
-
-func (me *boltDBPiece) GetIsComplete() (complete bool) {
- err := me.db.View(func(tx *bolt.Tx) error {
- cb := tx.Bucket(completed)
- // db := tx.Bucket(data)
- complete =
- cb != nil && len(cb.Get(me.key[:])) != 0
- // db != nil && int64(len(db.Get(me.key[:]))) == me.p.Length()
- return nil
- })
- if err != nil {
- panic(err)
- }
- return
-}
-
-func (me *boltDBPiece) MarkComplete() error {
- return me.db.Update(func(tx *bolt.Tx) error {
- b, err := tx.CreateBucketIfNotExists(completed)
- if err != nil {
- return err
- }
- return b.Put(me.key[:], completedValue)
- })
-}
-
-func (me *boltDBPiece) MarkNotComplete() error {
- return me.db.Update(func(tx *bolt.Tx) error {
- b := tx.Bucket(completed)
- if b == nil {
- return nil
- }
- return b.Delete(me.key[:])
- })
-}
-func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
- err = me.db.View(func(tx *bolt.Tx) error {
- db := tx.Bucket(data)
- if db == nil {
- return nil
- }
- ci := off / chunkSize
- off %= chunkSize
- for len(b) != 0 {
- ck := me.chunkKey(int(ci))
- _b := db.Get(ck[:])
- if len(_b) != chunkSize {
- break
- }
- n1 := copy(b, _b[off:])
- off = 0
- ci++
- b = b[n1:]
- n += n1
- }
- return nil
- })
- return
-}
-
-func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) {
- copy(ret[:], me.key[:])
- binary.BigEndian.PutUint16(ret[24:], uint16(index))
- return
-}
-
-func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) {
- err = me.db.Update(func(tx *bolt.Tx) error {
- db, err := tx.CreateBucketIfNotExists(data)
- if err != nil {
- return err
- }
- ci := off / chunkSize
- off %= chunkSize
- for len(b) != 0 {
- _b := make([]byte, chunkSize)
- ck := me.chunkKey(int(ci))
- copy(_b, db.Get(ck[:]))
- n1 := copy(_b[off:], b)
- db.Put(ck[:], _b)
- if n1 > len(b) {
- break
- }
- b = b[n1:]
- off = 0
- ci++
- n += n1
- }
- return nil
- })
- return
-}
b, err := pc.Get(pk)
require.NoError(t, err)
- assert.False(t, b)
+ assert.False(t, b.Ok)
require.NoError(t, pc.Set(pk, false))
b, err = pc.Get(pk)
require.NoError(t, err)
- assert.False(t, b)
+ assert.Equal(t, Completion{Complete: false, Ok: true}, b)
require.NoError(t, pc.Set(pk, true))
b, err = pc.Get(pk)
require.NoError(t, err)
- assert.True(t, b)
+ assert.Equal(t, Completion{Complete: true, Ok: true}, b)
}
"github.com/anacrolix/torrent/metainfo"
)
+type PieceCompletionGetSetter interface {
+ Get(metainfo.PieceKey) (Completion, error)
+ Set(metainfo.PieceKey, bool) error
+}
+
// Implementations track the completion of pieces. It must be concurrent-safe.
type PieceCompletion interface {
- Get(metainfo.PieceKey) (bool, error)
- Set(metainfo.PieceKey, bool) error
+ PieceCompletionGetSetter
Close() error
}
type mapPieceCompletion struct {
mu sync.Mutex
- m map[metainfo.PieceKey]struct{}
+ m map[metainfo.PieceKey]bool
}
+var _ PieceCompletion = (*mapPieceCompletion)(nil)
+
func NewMapPieceCompletion() PieceCompletion {
- return &mapPieceCompletion{m: make(map[metainfo.PieceKey]struct{})}
+ return &mapPieceCompletion{m: make(map[metainfo.PieceKey]bool)}
}
func (*mapPieceCompletion) Close() error { return nil }
-func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (bool, error) {
+func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error) {
me.mu.Lock()
- _, ok := me.m[pk]
- me.mu.Unlock()
- return ok, nil
+ defer me.mu.Unlock()
+ c.Complete, c.Ok = me.m[pk]
+ return
}
func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
me.mu.Lock()
- if b {
- if me.m == nil {
- me.m = make(map[metainfo.PieceKey]struct{})
- }
- me.m[pk] = struct{}{}
- } else {
- delete(me.m, pk)
+ defer me.mu.Unlock()
+ if me.m == nil {
+ me.m = make(map[metainfo.PieceKey]bool)
}
- me.mu.Unlock()
+ me.m[pk] = b
return nil
}
// Create a view onto the file-based torrent storage.
_io := fileTorrentImplIO{fts}
// Return the appropriate segments of this.
- return &fileStoragePiece{
+ return &filePieceImpl{
fts,
p,
missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),
--- /dev/null
+package storage
+
+import (
+ "io"
+ "os"
+
+ "github.com/anacrolix/torrent/metainfo"
+)
+
+type filePieceImpl struct {
+ *fileTorrentImpl
+ p metainfo.Piece
+ io.WriterAt
+ io.ReaderAt
+}
+
+var _ PieceImpl = (*filePieceImpl)(nil)
+
+func (me *filePieceImpl) pieceKey() metainfo.PieceKey {
+ return metainfo.PieceKey{me.infoHash, me.p.Index()}
+}
+
+func (fs *filePieceImpl) Completion() Completion {
+ c, err := fs.completion.Get(fs.pieceKey())
+ if err != nil || !c.Ok {
+ return Completion{Ok: false}
+ }
+ // If it's allegedly complete, check that its constituent files have the
+ // necessary length.
+ for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
+ s, err := os.Stat(fs.fileInfoName(fi))
+ if err != nil || s.Size() < fi.Length {
+ c.Complete = false
+ break
+ }
+ }
+ if !c.Complete {
+ // The completion was wrong, fix it.
+ fs.completion.Set(fs.pieceKey(), false)
+ }
+ return c
+}
+
+func (fs *filePieceImpl) MarkComplete() error {
+ return fs.completion.Set(fs.pieceKey(), true)
+}
+
+func (fs *filePieceImpl) MarkNotComplete() error {
+ return fs.completion.Set(fs.pieceKey(), false)
+}
+++ /dev/null
-package storage
-
-import (
- "io"
- "os"
-
- "github.com/anacrolix/torrent/metainfo"
-)
-
-type fileStoragePiece struct {
- *fileTorrentImpl
- p metainfo.Piece
- io.WriterAt
- io.ReaderAt
-}
-
-func (me *fileStoragePiece) pieceKey() metainfo.PieceKey {
- return metainfo.PieceKey{me.infoHash, me.p.Index()}
-}
-
-func (fs *fileStoragePiece) GetIsComplete() bool {
- ret, err := fs.completion.Get(fs.pieceKey())
- if err != nil || !ret {
- return false
- }
- // If it's allegedly complete, check that its constituent files have the
- // necessary length.
- for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
- s, err := os.Stat(fs.fileInfoName(fi))
- if err != nil || s.Size() < fi.Length {
- ret = false
- break
- }
- }
- if ret {
- return true
- }
- // The completion was wrong, fix it.
- fs.completion.Set(fs.pieceKey(), false)
- return false
-}
-
-func (fs *fileStoragePiece) MarkComplete() error {
- fs.completion.Set(fs.pieceKey(), true)
- return nil
-}
-
-func (fs *fileStoragePiece) MarkNotComplete() error {
- fs.completion.Set(fs.pieceKey(), false)
- return nil
-}
MarkComplete() error
MarkNotComplete() error
// Returns true if the piece is complete.
- GetIsComplete() bool
+ Completion() Completion
+}
+
+type Completion struct {
+ Complete bool
+ Ok bool
}
require.NoError(t, err)
t2p := t2.Piece(i2.Piece(0))
assert.NoError(t, t1.Close())
- assert.NotPanics(t, func() { t2p.GetIsComplete() })
+ assert.NotPanics(t, func() { t2p.Completion() })
}
func TestIssue95File(t *testing.T) {
n, err := p.ReadAt(make([]byte, 1), 0)
require.Error(t, err)
require.EqualValues(t, 0, n)
- require.False(t, p.GetIsComplete())
+ require.False(t, p.Completion().Complete)
}
func TestMarkedCompleteMissingOnReadFile(t *testing.T) {
return metainfo.PieceKey{me.ih, me.p.Index()}
}
-func (sp mmapStoragePiece) GetIsComplete() (ret bool) {
- ret, _ = sp.pc.Get(sp.pieceKey())
- return
+func (sp mmapStoragePiece) Completion() Completion {
+ c, _ := sp.pc.Get(sp.pieceKey())
+ return c
}
func (sp mmapStoragePiece) MarkComplete() error {
i resource.Instance
}
-func (s piecePerResourcePiece) GetIsComplete() bool {
+func (s piecePerResourcePiece) Completion() Completion {
fi, err := s.c.Stat()
- return err == nil && fi.Size() == s.p.Length()
+ return Completion{
+ Complete: err == nil && fi.Size() == s.p.Length(),
+ Ok: true,
+ }
}
func (s piecePerResourcePiece) MarkComplete() error {
}
func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
- if s.GetIsComplete() {
+ if s.Completion().Complete {
return s.c.ReadAt(b, off)
} else {
return s.i.ReadAt(b, off)
}
func (p Piece) WriteAt(b []byte, off int64) (n int, err error) {
- if p.GetIsComplete() {
- err = errors.New("piece completed")
+ c := p.Completion()
+ if c.Ok && c.Complete {
+ err = errors.New("piece already completed")
return
}
if off+int64(len(b)) > p.mip.Length() {
pendingPieces bitmap.Bitmap
// A cache of completed piece indices.
completedPieces bitmap.Bitmap
+ // Pieces that need to be hashed.
+ piecesQueuedForHash bitmap.Bitmap
// A pool of piece priorities []int for assignment to new connections.
// These "inclinations" are used to give connections preference for
return t.completedPieces.Get(piece)
}
-func (t *Torrent) pieceCompleteUncached(piece int) bool {
- return t.pieces[piece].Storage().GetIsComplete()
+func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
+ return t.pieces[piece].Storage().Completion()
}
// There's a connection to that address already.
}
for i := range t.pieces {
t.updatePieceCompletion(i)
- // t.pieces[i].QueuedForHash = true
+ p := &t.pieces[i]
+ if !p.storageCompletionOk {
+ log.Printf("piece %s completion unknown, queueing check", p)
+ t.queuePieceCheck(i)
+ }
}
- // go func() {
- // for i := range t.pieces {
- // t.verifyPiece(i)
- // }
- // }()
return nil
}
if t.pieceComplete(index) {
ret.Complete = true
}
- if p.queuedForHash || p.hashing {
+ if p.queuedForHash() || p.hashing {
ret.Checking = true
}
if !ret.Complete && t.piecePartiallyDownloaded(index) {
return false
}
p := &t.pieces[index]
- if p.queuedForHash {
+ if p.queuedForHash() {
return false
}
if p.hashing {
func (t *Torrent) updatePieceCompletion(piece int) {
pcu := t.pieceCompleteUncached(piece)
- changed := t.completedPieces.Get(piece) != pcu
- t.completedPieces.Set(piece, pcu)
+ p := &t.pieces[piece]
+ changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
+ p.storageCompletionOk = pcu.Ok
+ t.completedPieces.Set(piece, pcu.Complete)
if changed {
t.pieceCompletionChanged(piece)
}
cl.mu.Lock()
defer cl.mu.Unlock()
p := &t.pieces[piece]
+ defer func() {
+ p.numVerifies++
+ cl.event.Broadcast()
+ }()
for p.hashing || t.storage == nil {
cl.event.Wait()
}
- p.queuedForHash = false
+ if !p.t.piecesQueuedForHash.Remove(piece) {
+ panic("piece was not queued")
+ }
if t.closed.IsSet() || t.pieceComplete(piece) {
t.updatePiecePriority(piece)
+ log.Println("early return", t.closed.IsSet(), t.pieceComplete(piece))
return
}
p.hashing = true
cl.mu.Unlock()
sum := t.hashPiece(piece)
cl.mu.Lock()
- p.numVerifies++
p.hashing = false
t.pieceHashed(piece, sum == p.hash)
}
// Currently doesn't really queue, but should in the future.
func (t *Torrent) queuePieceCheck(pieceIndex int) {
piece := &t.pieces[pieceIndex]
- if piece.queuedForHash {
+ if piece.queuedForHash() {
return
}
- piece.queuedForHash = true
+ t.piecesQueuedForHash.Add(pieceIndex)
t.publishPieceChange(pieceIndex)
go t.verifyPiece(pieceIndex)
}