]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Track completion known to implementation state
authorMatt Joiner <anacrolix@gmail.com>
Thu, 12 Oct 2017 05:09:32 +0000 (16:09 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 12 Oct 2017 05:09:32 +0000 (16:09 +1100)
Addresses #193

19 files changed:
client_test.go
connection_test.go
piece.go
storage/boltPieceCompletion.go
storage/bolt_piece.go [new file with mode: 0644]
storage/boltdb.go
storage/boltpc_test.go
storage/completion.go
storage/completion_piece_map.go
storage/file.go
storage/file_piece.go [new file with mode: 0644]
storage/file_storage_piece.go [deleted file]
storage/interface.go
storage/issue95_test.go
storage/issue96_test.go
storage/mmap.go
storage/piece_resource.go
storage/wrappers.go
torrent.go

index fc008b99e399a9a334571ff8db6afa8af35cb078..539bb4480382ca6b00a5bb7264e0643dab12458e 100644 (file)
@@ -537,12 +537,14 @@ type badStoragePiece struct {
        p metainfo.Piece
 }
 
+var _ storage.PieceImpl = badStoragePiece{}
+
 func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
        return 0, nil
 }
 
-func (p badStoragePiece) GetIsComplete() bool {
-       return true
+func (p badStoragePiece) Completion() storage.Completion {
+       return storage.Completion{Complete: true, Ok: true}
 }
 
 func (p badStoragePiece) MarkComplete() error {
index fad120c8387f1985b243f46dae712c74bbabd1f8..3f6dfd93b17b282c023adbdaff18750b977a8fef 100644 (file)
@@ -58,8 +58,8 @@ func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
        return me
 }
 
-func (me *torrentStorage) GetIsComplete() bool {
-       return false
+func (me *torrentStorage) Completion() storage.Completion {
+       return storage.Completion{}
 }
 
 func (me *torrentStorage) MarkComplete() error {
index e09e00c8da5160dbcd14ffb92e53b7b88ce8c6fd..55e813af572e2438ce658a9a51e2ebbe4890ea04 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -1,6 +1,8 @@
 package torrent
 
 import (
+       "fmt"
+       "log"
        "sync"
 
        "github.com/anacrolix/missinggo/bitmap"
@@ -38,10 +40,10 @@ type Piece struct {
        // length can be determined by the request chunkSize in use.
        dirtyChunks bitmap.Bitmap
 
-       hashing       bool
-       queuedForHash bool
-       everHashed    bool
-       numVerifies   int64
+       hashing             bool
+       everHashed          bool
+       numVerifies         int64
+       storageCompletionOk bool
 
        publicPieceState PieceState
        priority         piecePriority
@@ -51,6 +53,10 @@ type Piece struct {
        noPendingWrites    sync.Cond
 }
 
+func (p *Piece) String() string {
+       return fmt.Sprintf("%s/%d", p.t.infoHash.HexString(), p.index)
+}
+
 func (p *Piece) Info() metainfo.Piece {
        return p.t.info.Piece(p.index)
 }
@@ -168,8 +174,15 @@ func (p *Piece) VerifyData() {
        if p.hashing {
                target++
        }
+       log.Printf("target: %d", target)
        p.t.queuePieceCheck(p.index)
        for p.numVerifies < target {
+               log.Printf("got %d verifies", p.numVerifies)
                p.t.cl.event.Wait()
        }
+       log.Print("done")
+}
+
+func (p *Piece) queuedForHash() bool {
+       return p.t.piecesQueuedForHash.Get(p.index)
 }
index 52e38f6a69564052f1cf65e8b0fd0f9636498424..93cf8d6f666da0a033201edb958841028c694365 100644 (file)
@@ -11,14 +11,21 @@ import (
        "github.com/anacrolix/torrent/metainfo"
 )
 
+const (
+       boltDbCompleteValue   = "c"
+       boltDbIncompleteValue = "i"
+)
+
 var (
-       value = []byte{}
+       completionBucketKey = []byte("completion")
 )
 
 type boltPieceCompletion struct {
        db *bolt.DB
 }
 
+var _ PieceCompletion = (*boltPieceCompletion)(nil)
+
 func NewBoltPieceCompletion(dir string) (ret PieceCompletion, err error) {
        os.MkdirAll(dir, 0770)
        p := filepath.Join(dir, ".torrent.bolt.db")
@@ -32,27 +39,35 @@ func NewBoltPieceCompletion(dir string) (ret PieceCompletion, err error) {
        return
 }
 
-func (me *boltPieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) {
+func (me boltPieceCompletion) Get(pk metainfo.PieceKey) (cn Completion, err error) {
        err = me.db.View(func(tx *bolt.Tx) error {
-               c := tx.Bucket(completed)
-               if c == nil {
+               cb := tx.Bucket(completionBucketKey)
+               if cb == nil {
                        return nil
                }
-               ih := c.Bucket(pk.InfoHash[:])
+               ih := cb.Bucket(pk.InfoHash[:])
                if ih == nil {
                        return nil
                }
                var key [4]byte
                binary.BigEndian.PutUint32(key[:], uint32(pk.Index))
-               ret = ih.Get(key[:]) != nil
+               cn.Ok = true
+               switch string(ih.Get(key[:])) {
+               case boltDbCompleteValue:
+                       cn.Complete = true
+               case boltDbIncompleteValue:
+                       cn.Complete = false
+               default:
+                       cn.Ok = false
+               }
                return nil
        })
        return
 }
 
-func (me *boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
+func (me boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
        return me.db.Update(func(tx *bolt.Tx) error {
-               c, err := tx.CreateBucketIfNotExists(completed)
+               c, err := tx.CreateBucketIfNotExists(completionBucketKey)
                if err != nil {
                        return err
                }
@@ -62,11 +77,13 @@ func (me *boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
                }
                var key [4]byte
                binary.BigEndian.PutUint32(key[:], uint32(pk.Index))
-               if b {
-                       return ih.Put(key[:], value)
-               } else {
-                       return ih.Delete(key[:])
-               }
+               return ih.Put(key[:], []byte(func() string {
+                       if b {
+                               return boltDbCompleteValue
+                       } else {
+                               return boltDbIncompleteValue
+                       }
+               }()))
        })
 }
 
diff --git a/storage/bolt_piece.go b/storage/bolt_piece.go
new file mode 100644 (file)
index 0000000..a9e22bb
--- /dev/null
@@ -0,0 +1,100 @@
+package storage
+
+import (
+       "encoding/binary"
+
+       "github.com/anacrolix/missinggo/x"
+       "github.com/anacrolix/torrent/metainfo"
+       "github.com/boltdb/bolt"
+)
+
+type boltDBPiece struct {
+       db  *bolt.DB
+       p   metainfo.Piece
+       ih  metainfo.Hash
+       key [24]byte
+}
+
+var (
+       _             PieceImpl = (*boltDBPiece)(nil)
+       dataBucketKey           = []byte("data")
+)
+
+func (me *boltDBPiece) pc() PieceCompletionGetSetter {
+       return boltPieceCompletion{me.db}
+}
+
+func (me *boltDBPiece) pk() metainfo.PieceKey {
+       return metainfo.PieceKey{me.ih, me.p.Index()}
+}
+
+func (me *boltDBPiece) Completion() Completion {
+       c, err := me.pc().Get(me.pk())
+       x.Pie(err)
+       return c
+}
+
+func (me *boltDBPiece) MarkComplete() error {
+       return me.pc().Set(me.pk(), true)
+}
+
+func (me *boltDBPiece) MarkNotComplete() error {
+       return me.pc().Set(me.pk(), false)
+}
+func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
+       err = me.db.View(func(tx *bolt.Tx) error {
+               db := tx.Bucket(dataBucketKey)
+               if db == nil {
+                       return nil
+               }
+               ci := off / chunkSize
+               off %= chunkSize
+               for len(b) != 0 {
+                       ck := me.chunkKey(int(ci))
+                       _b := db.Get(ck[:])
+                       if len(_b) != chunkSize {
+                               break
+                       }
+                       n1 := copy(b, _b[off:])
+                       off = 0
+                       ci++
+                       b = b[n1:]
+                       n += n1
+               }
+               return nil
+       })
+       return
+}
+
+func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) {
+       copy(ret[:], me.key[:])
+       binary.BigEndian.PutUint16(ret[24:], uint16(index))
+       return
+}
+
+func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) {
+       err = me.db.Update(func(tx *bolt.Tx) error {
+               db, err := tx.CreateBucketIfNotExists(dataBucketKey)
+               if err != nil {
+                       return err
+               }
+               ci := off / chunkSize
+               off %= chunkSize
+               for len(b) != 0 {
+                       _b := make([]byte, chunkSize)
+                       ck := me.chunkKey(int(ci))
+                       copy(_b, db.Get(ck[:]))
+                       n1 := copy(_b[off:], b)
+                       db.Put(ck[:], _b)
+                       if n1 > len(b) {
+                               break
+                       }
+                       b = b[n1:]
+                       off = 0
+                       ci++
+                       n += n1
+               }
+               return nil
+       })
+       return
+}
index 09562ce864ed05850d1c7fcec3995211b8f635e2..6c8de430d6ca7aa1e0a596a7d53ba505e3a8fd4e 100644 (file)
@@ -15,16 +15,6 @@ const (
        chunkSize = 1 << 14
 )
 
-var (
-       // The key for the data bucket.
-       data = []byte("data")
-       // The key for the completion flag bucket.
-       completed = []byte("completed")
-       // The value to assigned to pieces that are complete in the completed
-       // bucket.
-       completedValue = []byte{1}
-)
-
 type boltDBClient struct {
        db *bolt.DB
 }
@@ -34,12 +24,6 @@ type boltDBTorrent struct {
        ih metainfo.Hash
 }
 
-type boltDBPiece struct {
-       db  *bolt.DB
-       p   metainfo.Piece
-       key [24]byte
-}
-
 func NewBoltDB(filePath string) ClientImpl {
        ret := &boltDBClient{}
        var err error
@@ -59,102 +43,14 @@ func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash)
 }
 
 func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl {
-       ret := &boltDBPiece{p: p, db: me.cl.db}
+       ret := &boltDBPiece{
+               p:  p,
+               db: me.cl.db,
+               ih: me.ih,
+       }
        copy(ret.key[:], me.ih[:])
        binary.BigEndian.PutUint32(ret.key[20:], uint32(p.Index()))
        return ret
 }
 
 func (boltDBTorrent) Close() error { return nil }
-
-func (me *boltDBPiece) GetIsComplete() (complete bool) {
-       err := me.db.View(func(tx *bolt.Tx) error {
-               cb := tx.Bucket(completed)
-               // db := tx.Bucket(data)
-               complete =
-                       cb != nil && len(cb.Get(me.key[:])) != 0
-                       // db != nil && int64(len(db.Get(me.key[:]))) == me.p.Length()
-               return nil
-       })
-       if err != nil {
-               panic(err)
-       }
-       return
-}
-
-func (me *boltDBPiece) MarkComplete() error {
-       return me.db.Update(func(tx *bolt.Tx) error {
-               b, err := tx.CreateBucketIfNotExists(completed)
-               if err != nil {
-                       return err
-               }
-               return b.Put(me.key[:], completedValue)
-       })
-}
-
-func (me *boltDBPiece) MarkNotComplete() error {
-       return me.db.Update(func(tx *bolt.Tx) error {
-               b := tx.Bucket(completed)
-               if b == nil {
-                       return nil
-               }
-               return b.Delete(me.key[:])
-       })
-}
-func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
-       err = me.db.View(func(tx *bolt.Tx) error {
-               db := tx.Bucket(data)
-               if db == nil {
-                       return nil
-               }
-               ci := off / chunkSize
-               off %= chunkSize
-               for len(b) != 0 {
-                       ck := me.chunkKey(int(ci))
-                       _b := db.Get(ck[:])
-                       if len(_b) != chunkSize {
-                               break
-                       }
-                       n1 := copy(b, _b[off:])
-                       off = 0
-                       ci++
-                       b = b[n1:]
-                       n += n1
-               }
-               return nil
-       })
-       return
-}
-
-func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) {
-       copy(ret[:], me.key[:])
-       binary.BigEndian.PutUint16(ret[24:], uint16(index))
-       return
-}
-
-func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) {
-       err = me.db.Update(func(tx *bolt.Tx) error {
-               db, err := tx.CreateBucketIfNotExists(data)
-               if err != nil {
-                       return err
-               }
-               ci := off / chunkSize
-               off %= chunkSize
-               for len(b) != 0 {
-                       _b := make([]byte, chunkSize)
-                       ck := me.chunkKey(int(ci))
-                       copy(_b, db.Get(ck[:]))
-                       n1 := copy(_b[off:], b)
-                       db.Put(ck[:], _b)
-                       if n1 > len(b) {
-                               break
-                       }
-                       b = b[n1:]
-                       off = 0
-                       ci++
-                       n += n1
-               }
-               return nil
-       })
-       return
-}
index 3f5572e711ae97316fe5c5c4a230679975a5b270..53fdbdb590a27a7e26c8c233779e7b10a7ae8acc 100644 (file)
@@ -24,17 +24,17 @@ func TestBoltPieceCompletion(t *testing.T) {
 
        b, err := pc.Get(pk)
        require.NoError(t, err)
-       assert.False(t, b)
+       assert.False(t, b.Ok)
 
        require.NoError(t, pc.Set(pk, false))
 
        b, err = pc.Get(pk)
        require.NoError(t, err)
-       assert.False(t, b)
+       assert.Equal(t, Completion{Complete: false, Ok: true}, b)
 
        require.NoError(t, pc.Set(pk, true))
 
        b, err = pc.Get(pk)
        require.NoError(t, err)
-       assert.True(t, b)
+       assert.Equal(t, Completion{Complete: true, Ok: true}, b)
 }
index c464a0a7b5e06dfad2aaa1a0c852012a251bf67e..3d2e4d77dd191e85082ac04df9da6fc5740069c1 100644 (file)
@@ -6,10 +6,14 @@ import (
        "github.com/anacrolix/torrent/metainfo"
 )
 
+type PieceCompletionGetSetter interface {
+       Get(metainfo.PieceKey) (Completion, error)
+       Set(metainfo.PieceKey, bool) error
+}
+
 // Implementations track the completion of pieces. It must be concurrent-safe.
 type PieceCompletion interface {
-       Get(metainfo.PieceKey) (bool, error)
-       Set(metainfo.PieceKey, bool) error
+       PieceCompletionGetSetter
        Close() error
 }
 
index ebd98259edebbaeb5a7472758b63059a759027ba..e12aca7c7621ddbdde75947ecf77d5c8ffc6a242 100644 (file)
@@ -8,32 +8,30 @@ import (
 
 type mapPieceCompletion struct {
        mu sync.Mutex
-       m  map[metainfo.PieceKey]struct{}
+       m  map[metainfo.PieceKey]bool
 }
 
+var _ PieceCompletion = (*mapPieceCompletion)(nil)
+
 func NewMapPieceCompletion() PieceCompletion {
-       return &mapPieceCompletion{m: make(map[metainfo.PieceKey]struct{})}
+       return &mapPieceCompletion{m: make(map[metainfo.PieceKey]bool)}
 }
 
 func (*mapPieceCompletion) Close() error { return nil }
 
-func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (bool, error) {
+func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error) {
        me.mu.Lock()
-       _, ok := me.m[pk]
-       me.mu.Unlock()
-       return ok, nil
+       defer me.mu.Unlock()
+       c.Complete, c.Ok = me.m[pk]
+       return
 }
 
 func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
        me.mu.Lock()
-       if b {
-               if me.m == nil {
-                       me.m = make(map[metainfo.PieceKey]struct{})
-               }
-               me.m[pk] = struct{}{}
-       } else {
-               delete(me.m, pk)
+       defer me.mu.Unlock()
+       if me.m == nil {
+               me.m = make(map[metainfo.PieceKey]bool)
        }
-       me.mu.Unlock()
+       me.m[pk] = b
        return nil
 }
index e6645ee7fcce9117599c89f4f7c5fffa1c66c8e6..25621b1f7f74f42c4bfebd2c1da1b9c8c59f23df 100644 (file)
@@ -86,7 +86,7 @@ func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
        // Create a view onto the file-based torrent storage.
        _io := fileTorrentImplIO{fts}
        // Return the appropriate segments of this.
-       return &fileStoragePiece{
+       return &filePieceImpl{
                fts,
                p,
                missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),
diff --git a/storage/file_piece.go b/storage/file_piece.go
new file mode 100644 (file)
index 0000000..ffa37be
--- /dev/null
@@ -0,0 +1,50 @@
+package storage
+
+import (
+       "io"
+       "os"
+
+       "github.com/anacrolix/torrent/metainfo"
+)
+
+type filePieceImpl struct {
+       *fileTorrentImpl
+       p metainfo.Piece
+       io.WriterAt
+       io.ReaderAt
+}
+
+var _ PieceImpl = (*filePieceImpl)(nil)
+
+func (me *filePieceImpl) pieceKey() metainfo.PieceKey {
+       return metainfo.PieceKey{me.infoHash, me.p.Index()}
+}
+
+func (fs *filePieceImpl) Completion() Completion {
+       c, err := fs.completion.Get(fs.pieceKey())
+       if err != nil || !c.Ok {
+               return Completion{Ok: false}
+       }
+       // If it's allegedly complete, check that its constituent files have the
+       // necessary length.
+       for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
+               s, err := os.Stat(fs.fileInfoName(fi))
+               if err != nil || s.Size() < fi.Length {
+                       c.Complete = false
+                       break
+               }
+       }
+       if !c.Complete {
+               // The completion was wrong, fix it.
+               fs.completion.Set(fs.pieceKey(), false)
+       }
+       return c
+}
+
+func (fs *filePieceImpl) MarkComplete() error {
+       return fs.completion.Set(fs.pieceKey(), true)
+}
+
+func (fs *filePieceImpl) MarkNotComplete() error {
+       return fs.completion.Set(fs.pieceKey(), false)
+}
diff --git a/storage/file_storage_piece.go b/storage/file_storage_piece.go
deleted file mode 100644 (file)
index 321440a..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-package storage
-
-import (
-       "io"
-       "os"
-
-       "github.com/anacrolix/torrent/metainfo"
-)
-
-type fileStoragePiece struct {
-       *fileTorrentImpl
-       p metainfo.Piece
-       io.WriterAt
-       io.ReaderAt
-}
-
-func (me *fileStoragePiece) pieceKey() metainfo.PieceKey {
-       return metainfo.PieceKey{me.infoHash, me.p.Index()}
-}
-
-func (fs *fileStoragePiece) GetIsComplete() bool {
-       ret, err := fs.completion.Get(fs.pieceKey())
-       if err != nil || !ret {
-               return false
-       }
-       // If it's allegedly complete, check that its constituent files have the
-       // necessary length.
-       for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
-               s, err := os.Stat(fs.fileInfoName(fi))
-               if err != nil || s.Size() < fi.Length {
-                       ret = false
-                       break
-               }
-       }
-       if ret {
-               return true
-       }
-       // The completion was wrong, fix it.
-       fs.completion.Set(fs.pieceKey(), false)
-       return false
-}
-
-func (fs *fileStoragePiece) MarkComplete() error {
-       fs.completion.Set(fs.pieceKey(), true)
-       return nil
-}
-
-func (fs *fileStoragePiece) MarkNotComplete() error {
-       fs.completion.Set(fs.pieceKey(), false)
-       return nil
-}
index 80182c9644e4943a854eaefcac64193acd128d5c..307ba8934c9faa6b691865f64079d7b0e96a7c90 100644 (file)
@@ -31,5 +31,10 @@ type PieceImpl interface {
        MarkComplete() error
        MarkNotComplete() error
        // Returns true if the piece is complete.
-       GetIsComplete() bool
+       Completion() Completion
+}
+
+type Completion struct {
+       Complete bool
+       Ok       bool
 }
index bb2c2fe9a3f19ec350db6a271e2f03d436800a64..d47410dae3c287c56c9b2d6bb66a4f309f75cc3a 100644 (file)
@@ -29,7 +29,7 @@ func testIssue95(t *testing.T, c ClientImpl) {
        require.NoError(t, err)
        t2p := t2.Piece(i2.Piece(0))
        assert.NoError(t, t1.Close())
-       assert.NotPanics(t, func() { t2p.GetIsComplete() })
+       assert.NotPanics(t, func() { t2p.Completion() })
 }
 
 func TestIssue95File(t *testing.T) {
index b7267ba4ca76aaffae289ef1dbd58f33302772d8..1bdbbf15cb73e7bde6fb701c0e3347b527e14def 100644 (file)
@@ -27,7 +27,7 @@ func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImpl)
        n, err := p.ReadAt(make([]byte, 1), 0)
        require.Error(t, err)
        require.EqualValues(t, 0, n)
-       require.False(t, p.GetIsComplete())
+       require.False(t, p.Completion().Complete)
 }
 
 func TestMarkedCompleteMissingOnReadFile(t *testing.T) {
index fec9d117495dbbf6cb8a5b6f4fee8fdc9157567f..0dc512d431b8bab51e64917076951aac34e709a5 100644 (file)
@@ -76,9 +76,9 @@ func (me mmapStoragePiece) pieceKey() metainfo.PieceKey {
        return metainfo.PieceKey{me.ih, me.p.Index()}
 }
 
-func (sp mmapStoragePiece) GetIsComplete() (ret bool) {
-       ret, _ = sp.pc.Get(sp.pieceKey())
-       return
+func (sp mmapStoragePiece) Completion() Completion {
+       c, _ := sp.pc.Get(sp.pieceKey())
+       return c
 }
 
 func (sp mmapStoragePiece) MarkComplete() error {
index e46d923e92b5edfbd4ab7018b2242cea04dd57b5..1064fae557e9c4559a13253c751810c13f1b839b 100644 (file)
@@ -48,9 +48,12 @@ type piecePerResourcePiece struct {
        i resource.Instance
 }
 
-func (s piecePerResourcePiece) GetIsComplete() bool {
+func (s piecePerResourcePiece) Completion() Completion {
        fi, err := s.c.Stat()
-       return err == nil && fi.Size() == s.p.Length()
+       return Completion{
+               Complete: err == nil && fi.Size() == s.p.Length(),
+               Ok:       true,
+       }
 }
 
 func (s piecePerResourcePiece) MarkComplete() error {
@@ -62,7 +65,7 @@ func (s piecePerResourcePiece) MarkNotComplete() error {
 }
 
 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
-       if s.GetIsComplete() {
+       if s.Completion().Complete {
                return s.c.ReadAt(b, off)
        } else {
                return s.i.ReadAt(b, off)
index b4137e5be8e88fc3507b2729b27e5f8b9be89075..b8ba050b5503cd1729bff7f35e9d9285ffbe6283 100644 (file)
@@ -37,8 +37,9 @@ type Piece struct {
 }
 
 func (p Piece) WriteAt(b []byte, off int64) (n int, err error) {
-       if p.GetIsComplete() {
-               err = errors.New("piece completed")
+       c := p.Completion()
+       if c.Ok && c.Complete {
+               err = errors.New("piece already completed")
                return
        }
        if off+int64(len(b)) > p.mip.Length() {
index 3374778b2c12bedd0b362fdc691faf14a896a846..03713580c1d2fae5453efc068f5a532dfbb6cd1d 100644 (file)
@@ -112,6 +112,8 @@ type Torrent struct {
        pendingPieces bitmap.Bitmap
        // A cache of completed piece indices.
        completedPieces bitmap.Bitmap
+       // Pieces that need to be hashed.
+       piecesQueuedForHash bitmap.Bitmap
 
        // A pool of piece priorities []int for assignment to new connections.
        // These "inclinations" are used to give connections preference for
@@ -190,8 +192,8 @@ func (t *Torrent) pieceComplete(piece int) bool {
        return t.completedPieces.Get(piece)
 }
 
-func (t *Torrent) pieceCompleteUncached(piece int) bool {
-       return t.pieces[piece].Storage().GetIsComplete()
+func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
+       return t.pieces[piece].Storage().Completion()
 }
 
 // There's a connection to that address already.
@@ -332,13 +334,12 @@ func (t *Torrent) setInfoBytes(b []byte) error {
        }
        for i := range t.pieces {
                t.updatePieceCompletion(i)
-               // t.pieces[i].QueuedForHash = true
+               p := &t.pieces[i]
+               if !p.storageCompletionOk {
+                       log.Printf("piece %s completion unknown, queueing check", p)
+                       t.queuePieceCheck(i)
+               }
        }
-       // go func() {
-       //      for i := range t.pieces {
-       //              t.verifyPiece(i)
-       //      }
-       // }()
        return nil
 }
 
@@ -392,7 +393,7 @@ func (t *Torrent) pieceState(index int) (ret PieceState) {
        if t.pieceComplete(index) {
                ret.Complete = true
        }
-       if p.queuedForHash || p.hashing {
+       if p.queuedForHash() || p.hashing {
                ret.Checking = true
        }
        if !ret.Complete && t.piecePartiallyDownloaded(index) {
@@ -738,7 +739,7 @@ func (t *Torrent) wantPieceIndex(index int) bool {
                return false
        }
        p := &t.pieces[index]
-       if p.queuedForHash {
+       if p.queuedForHash() {
                return false
        }
        if p.hashing {
@@ -1005,8 +1006,10 @@ func (t *Torrent) putPieceInclination(pi []int) {
 
 func (t *Torrent) updatePieceCompletion(piece int) {
        pcu := t.pieceCompleteUncached(piece)
-       changed := t.completedPieces.Get(piece) != pcu
-       t.completedPieces.Set(piece, pcu)
+       p := &t.pieces[piece]
+       changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
+       p.storageCompletionOk = pcu.Ok
+       t.completedPieces.Set(piece, pcu.Complete)
        if changed {
                t.pieceCompletionChanged(piece)
        }
@@ -1517,12 +1520,19 @@ func (t *Torrent) verifyPiece(piece int) {
        cl.mu.Lock()
        defer cl.mu.Unlock()
        p := &t.pieces[piece]
+       defer func() {
+               p.numVerifies++
+               cl.event.Broadcast()
+       }()
        for p.hashing || t.storage == nil {
                cl.event.Wait()
        }
-       p.queuedForHash = false
+       if !p.t.piecesQueuedForHash.Remove(piece) {
+               panic("piece was not queued")
+       }
        if t.closed.IsSet() || t.pieceComplete(piece) {
                t.updatePiecePriority(piece)
+               log.Println("early return", t.closed.IsSet(), t.pieceComplete(piece))
                return
        }
        p.hashing = true
@@ -1530,7 +1540,6 @@ func (t *Torrent) verifyPiece(piece int) {
        cl.mu.Unlock()
        sum := t.hashPiece(piece)
        cl.mu.Lock()
-       p.numVerifies++
        p.hashing = false
        t.pieceHashed(piece, sum == p.hash)
 }
@@ -1557,10 +1566,10 @@ func (t *Torrent) connsAsSlice() (ret []*connection) {
 // Currently doesn't really queue, but should in the future.
 func (t *Torrent) queuePieceCheck(pieceIndex int) {
        piece := &t.pieces[pieceIndex]
-       if piece.queuedForHash {
+       if piece.queuedForHash() {
                return
        }
-       piece.queuedForHash = true
+       t.piecesQueuedForHash.Add(pieceIndex)
        t.publishPieceChange(pieceIndex)
        go t.verifyPiece(pieceIndex)
 }