From: Matt Joiner Date: Thu, 12 Oct 2017 05:09:32 +0000 (+1100) Subject: Track completion known to implementation state X-Git-Tag: v1.0.0~368 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=dd083a4e11a09fd9fbe5209ecc0274278aeeceec;p=btrtrc.git Track completion known to implementation state Addresses #193 --- diff --git a/client_test.go b/client_test.go index fc008b99..539bb448 100644 --- a/client_test.go +++ b/client_test.go @@ -537,12 +537,14 @@ type badStoragePiece struct { p metainfo.Piece } +var _ storage.PieceImpl = badStoragePiece{} + func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) { return 0, nil } -func (p badStoragePiece) GetIsComplete() bool { - return true +func (p badStoragePiece) Completion() storage.Completion { + return storage.Completion{Complete: true, Ok: true} } func (p badStoragePiece) MarkComplete() error { diff --git a/connection_test.go b/connection_test.go index fad120c8..3f6dfd93 100644 --- a/connection_test.go +++ b/connection_test.go @@ -58,8 +58,8 @@ func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl { return me } -func (me *torrentStorage) GetIsComplete() bool { - return false +func (me *torrentStorage) Completion() storage.Completion { + return storage.Completion{} } func (me *torrentStorage) MarkComplete() error { diff --git a/piece.go b/piece.go index e09e00c8..55e813af 100644 --- a/piece.go +++ b/piece.go @@ -1,6 +1,8 @@ package torrent import ( + "fmt" + "log" "sync" "github.com/anacrolix/missinggo/bitmap" @@ -38,10 +40,10 @@ type Piece struct { // length can be determined by the request chunkSize in use. dirtyChunks bitmap.Bitmap - hashing bool - queuedForHash bool - everHashed bool - numVerifies int64 + hashing bool + everHashed bool + numVerifies int64 + storageCompletionOk bool publicPieceState PieceState priority piecePriority @@ -51,6 +53,10 @@ type Piece struct { noPendingWrites sync.Cond } +func (p *Piece) String() string { + return fmt.Sprintf("%s/%d", p.t.infoHash.HexString(), p.index) +} + func (p *Piece) Info() metainfo.Piece { return p.t.info.Piece(p.index) } @@ -168,8 +174,15 @@ func (p *Piece) VerifyData() { if p.hashing { target++ } + log.Printf("target: %d", target) p.t.queuePieceCheck(p.index) for p.numVerifies < target { + log.Printf("got %d verifies", p.numVerifies) p.t.cl.event.Wait() } + log.Print("done") +} + +func (p *Piece) queuedForHash() bool { + return p.t.piecesQueuedForHash.Get(p.index) } diff --git a/storage/boltPieceCompletion.go b/storage/boltPieceCompletion.go index 52e38f6a..93cf8d6f 100644 --- a/storage/boltPieceCompletion.go +++ b/storage/boltPieceCompletion.go @@ -11,14 +11,21 @@ import ( "github.com/anacrolix/torrent/metainfo" ) +const ( + boltDbCompleteValue = "c" + boltDbIncompleteValue = "i" +) + var ( - value = []byte{} + completionBucketKey = []byte("completion") ) type boltPieceCompletion struct { db *bolt.DB } +var _ PieceCompletion = (*boltPieceCompletion)(nil) + func NewBoltPieceCompletion(dir string) (ret PieceCompletion, err error) { os.MkdirAll(dir, 0770) p := filepath.Join(dir, ".torrent.bolt.db") @@ -32,27 +39,35 @@ func NewBoltPieceCompletion(dir string) (ret PieceCompletion, err error) { return } -func (me *boltPieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) { +func (me boltPieceCompletion) Get(pk metainfo.PieceKey) (cn Completion, err error) { err = me.db.View(func(tx *bolt.Tx) error { - c := tx.Bucket(completed) - if c == nil { + cb := tx.Bucket(completionBucketKey) + if cb == nil { return nil } - ih := c.Bucket(pk.InfoHash[:]) + ih := cb.Bucket(pk.InfoHash[:]) if ih == nil { return nil } var key [4]byte binary.BigEndian.PutUint32(key[:], uint32(pk.Index)) - ret = ih.Get(key[:]) != nil + cn.Ok = true + switch string(ih.Get(key[:])) { + case boltDbCompleteValue: + cn.Complete = true + case boltDbIncompleteValue: + cn.Complete = false + default: + cn.Ok = false + } return nil }) return } -func (me *boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error { +func (me boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error { return me.db.Update(func(tx *bolt.Tx) error { - c, err := tx.CreateBucketIfNotExists(completed) + c, err := tx.CreateBucketIfNotExists(completionBucketKey) if err != nil { return err } @@ -62,11 +77,13 @@ func (me *boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error { } var key [4]byte binary.BigEndian.PutUint32(key[:], uint32(pk.Index)) - if b { - return ih.Put(key[:], value) - } else { - return ih.Delete(key[:]) - } + return ih.Put(key[:], []byte(func() string { + if b { + return boltDbCompleteValue + } else { + return boltDbIncompleteValue + } + }())) }) } diff --git a/storage/bolt_piece.go b/storage/bolt_piece.go new file mode 100644 index 00000000..a9e22bba --- /dev/null +++ b/storage/bolt_piece.go @@ -0,0 +1,100 @@ +package storage + +import ( + "encoding/binary" + + "github.com/anacrolix/missinggo/x" + "github.com/anacrolix/torrent/metainfo" + "github.com/boltdb/bolt" +) + +type boltDBPiece struct { + db *bolt.DB + p metainfo.Piece + ih metainfo.Hash + key [24]byte +} + +var ( + _ PieceImpl = (*boltDBPiece)(nil) + dataBucketKey = []byte("data") +) + +func (me *boltDBPiece) pc() PieceCompletionGetSetter { + return boltPieceCompletion{me.db} +} + +func (me *boltDBPiece) pk() metainfo.PieceKey { + return metainfo.PieceKey{me.ih, me.p.Index()} +} + +func (me *boltDBPiece) Completion() Completion { + c, err := me.pc().Get(me.pk()) + x.Pie(err) + return c +} + +func (me *boltDBPiece) MarkComplete() error { + return me.pc().Set(me.pk(), true) +} + +func (me *boltDBPiece) MarkNotComplete() error { + return me.pc().Set(me.pk(), false) +} +func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) { + err = me.db.View(func(tx *bolt.Tx) error { + db := tx.Bucket(dataBucketKey) + if db == nil { + return nil + } + ci := off / chunkSize + off %= chunkSize + for len(b) != 0 { + ck := me.chunkKey(int(ci)) + _b := db.Get(ck[:]) + if len(_b) != chunkSize { + break + } + n1 := copy(b, _b[off:]) + off = 0 + ci++ + b = b[n1:] + n += n1 + } + return nil + }) + return +} + +func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) { + copy(ret[:], me.key[:]) + binary.BigEndian.PutUint16(ret[24:], uint16(index)) + return +} + +func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) { + err = me.db.Update(func(tx *bolt.Tx) error { + db, err := tx.CreateBucketIfNotExists(dataBucketKey) + if err != nil { + return err + } + ci := off / chunkSize + off %= chunkSize + for len(b) != 0 { + _b := make([]byte, chunkSize) + ck := me.chunkKey(int(ci)) + copy(_b, db.Get(ck[:])) + n1 := copy(_b[off:], b) + db.Put(ck[:], _b) + if n1 > len(b) { + break + } + b = b[n1:] + off = 0 + ci++ + n += n1 + } + return nil + }) + return +} diff --git a/storage/boltdb.go b/storage/boltdb.go index 09562ce8..6c8de430 100644 --- a/storage/boltdb.go +++ b/storage/boltdb.go @@ -15,16 +15,6 @@ const ( chunkSize = 1 << 14 ) -var ( - // The key for the data bucket. - data = []byte("data") - // The key for the completion flag bucket. - completed = []byte("completed") - // The value to assigned to pieces that are complete in the completed - // bucket. - completedValue = []byte{1} -) - type boltDBClient struct { db *bolt.DB } @@ -34,12 +24,6 @@ type boltDBTorrent struct { ih metainfo.Hash } -type boltDBPiece struct { - db *bolt.DB - p metainfo.Piece - key [24]byte -} - func NewBoltDB(filePath string) ClientImpl { ret := &boltDBClient{} var err error @@ -59,102 +43,14 @@ func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) } func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl { - ret := &boltDBPiece{p: p, db: me.cl.db} + ret := &boltDBPiece{ + p: p, + db: me.cl.db, + ih: me.ih, + } copy(ret.key[:], me.ih[:]) binary.BigEndian.PutUint32(ret.key[20:], uint32(p.Index())) return ret } func (boltDBTorrent) Close() error { return nil } - -func (me *boltDBPiece) GetIsComplete() (complete bool) { - err := me.db.View(func(tx *bolt.Tx) error { - cb := tx.Bucket(completed) - // db := tx.Bucket(data) - complete = - cb != nil && len(cb.Get(me.key[:])) != 0 - // db != nil && int64(len(db.Get(me.key[:]))) == me.p.Length() - return nil - }) - if err != nil { - panic(err) - } - return -} - -func (me *boltDBPiece) MarkComplete() error { - return me.db.Update(func(tx *bolt.Tx) error { - b, err := tx.CreateBucketIfNotExists(completed) - if err != nil { - return err - } - return b.Put(me.key[:], completedValue) - }) -} - -func (me *boltDBPiece) MarkNotComplete() error { - return me.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(completed) - if b == nil { - return nil - } - return b.Delete(me.key[:]) - }) -} -func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) { - err = me.db.View(func(tx *bolt.Tx) error { - db := tx.Bucket(data) - if db == nil { - return nil - } - ci := off / chunkSize - off %= chunkSize - for len(b) != 0 { - ck := me.chunkKey(int(ci)) - _b := db.Get(ck[:]) - if len(_b) != chunkSize { - break - } - n1 := copy(b, _b[off:]) - off = 0 - ci++ - b = b[n1:] - n += n1 - } - return nil - }) - return -} - -func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) { - copy(ret[:], me.key[:]) - binary.BigEndian.PutUint16(ret[24:], uint16(index)) - return -} - -func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) { - err = me.db.Update(func(tx *bolt.Tx) error { - db, err := tx.CreateBucketIfNotExists(data) - if err != nil { - return err - } - ci := off / chunkSize - off %= chunkSize - for len(b) != 0 { - _b := make([]byte, chunkSize) - ck := me.chunkKey(int(ci)) - copy(_b, db.Get(ck[:])) - n1 := copy(_b[off:], b) - db.Put(ck[:], _b) - if n1 > len(b) { - break - } - b = b[n1:] - off = 0 - ci++ - n += n1 - } - return nil - }) - return -} diff --git a/storage/boltpc_test.go b/storage/boltpc_test.go index 3f5572e7..53fdbdb5 100644 --- a/storage/boltpc_test.go +++ b/storage/boltpc_test.go @@ -24,17 +24,17 @@ func TestBoltPieceCompletion(t *testing.T) { b, err := pc.Get(pk) require.NoError(t, err) - assert.False(t, b) + assert.False(t, b.Ok) require.NoError(t, pc.Set(pk, false)) b, err = pc.Get(pk) require.NoError(t, err) - assert.False(t, b) + assert.Equal(t, Completion{Complete: false, Ok: true}, b) require.NoError(t, pc.Set(pk, true)) b, err = pc.Get(pk) require.NoError(t, err) - assert.True(t, b) + assert.Equal(t, Completion{Complete: true, Ok: true}, b) } diff --git a/storage/completion.go b/storage/completion.go index c464a0a7..3d2e4d77 100644 --- a/storage/completion.go +++ b/storage/completion.go @@ -6,10 +6,14 @@ import ( "github.com/anacrolix/torrent/metainfo" ) +type PieceCompletionGetSetter interface { + Get(metainfo.PieceKey) (Completion, error) + Set(metainfo.PieceKey, bool) error +} + // Implementations track the completion of pieces. It must be concurrent-safe. type PieceCompletion interface { - Get(metainfo.PieceKey) (bool, error) - Set(metainfo.PieceKey, bool) error + PieceCompletionGetSetter Close() error } diff --git a/storage/completion_piece_map.go b/storage/completion_piece_map.go index ebd98259..e12aca7c 100644 --- a/storage/completion_piece_map.go +++ b/storage/completion_piece_map.go @@ -8,32 +8,30 @@ import ( type mapPieceCompletion struct { mu sync.Mutex - m map[metainfo.PieceKey]struct{} + m map[metainfo.PieceKey]bool } +var _ PieceCompletion = (*mapPieceCompletion)(nil) + func NewMapPieceCompletion() PieceCompletion { - return &mapPieceCompletion{m: make(map[metainfo.PieceKey]struct{})} + return &mapPieceCompletion{m: make(map[metainfo.PieceKey]bool)} } func (*mapPieceCompletion) Close() error { return nil } -func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (bool, error) { +func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error) { me.mu.Lock() - _, ok := me.m[pk] - me.mu.Unlock() - return ok, nil + defer me.mu.Unlock() + c.Complete, c.Ok = me.m[pk] + return } func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error { me.mu.Lock() - if b { - if me.m == nil { - me.m = make(map[metainfo.PieceKey]struct{}) - } - me.m[pk] = struct{}{} - } else { - delete(me.m, pk) + defer me.mu.Unlock() + if me.m == nil { + me.m = make(map[metainfo.PieceKey]bool) } - me.mu.Unlock() + me.m[pk] = b return nil } diff --git a/storage/file.go b/storage/file.go index e6645ee7..25621b1f 100644 --- a/storage/file.go +++ b/storage/file.go @@ -86,7 +86,7 @@ func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl { // Create a view onto the file-based torrent storage. _io := fileTorrentImplIO{fts} // Return the appropriate segments of this. - return &fileStoragePiece{ + return &filePieceImpl{ fts, p, missinggo.NewSectionWriter(_io, p.Offset(), p.Length()), diff --git a/storage/file_piece.go b/storage/file_piece.go new file mode 100644 index 00000000..ffa37bee --- /dev/null +++ b/storage/file_piece.go @@ -0,0 +1,50 @@ +package storage + +import ( + "io" + "os" + + "github.com/anacrolix/torrent/metainfo" +) + +type filePieceImpl struct { + *fileTorrentImpl + p metainfo.Piece + io.WriterAt + io.ReaderAt +} + +var _ PieceImpl = (*filePieceImpl)(nil) + +func (me *filePieceImpl) pieceKey() metainfo.PieceKey { + return metainfo.PieceKey{me.infoHash, me.p.Index()} +} + +func (fs *filePieceImpl) Completion() Completion { + c, err := fs.completion.Get(fs.pieceKey()) + if err != nil || !c.Ok { + return Completion{Ok: false} + } + // If it's allegedly complete, check that its constituent files have the + // necessary length. + for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) { + s, err := os.Stat(fs.fileInfoName(fi)) + if err != nil || s.Size() < fi.Length { + c.Complete = false + break + } + } + if !c.Complete { + // The completion was wrong, fix it. + fs.completion.Set(fs.pieceKey(), false) + } + return c +} + +func (fs *filePieceImpl) MarkComplete() error { + return fs.completion.Set(fs.pieceKey(), true) +} + +func (fs *filePieceImpl) MarkNotComplete() error { + return fs.completion.Set(fs.pieceKey(), false) +} diff --git a/storage/file_storage_piece.go b/storage/file_storage_piece.go deleted file mode 100644 index 321440a7..00000000 --- a/storage/file_storage_piece.go +++ /dev/null @@ -1,51 +0,0 @@ -package storage - -import ( - "io" - "os" - - "github.com/anacrolix/torrent/metainfo" -) - -type fileStoragePiece struct { - *fileTorrentImpl - p metainfo.Piece - io.WriterAt - io.ReaderAt -} - -func (me *fileStoragePiece) pieceKey() metainfo.PieceKey { - return metainfo.PieceKey{me.infoHash, me.p.Index()} -} - -func (fs *fileStoragePiece) GetIsComplete() bool { - ret, err := fs.completion.Get(fs.pieceKey()) - if err != nil || !ret { - return false - } - // If it's allegedly complete, check that its constituent files have the - // necessary length. - for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) { - s, err := os.Stat(fs.fileInfoName(fi)) - if err != nil || s.Size() < fi.Length { - ret = false - break - } - } - if ret { - return true - } - // The completion was wrong, fix it. - fs.completion.Set(fs.pieceKey(), false) - return false -} - -func (fs *fileStoragePiece) MarkComplete() error { - fs.completion.Set(fs.pieceKey(), true) - return nil -} - -func (fs *fileStoragePiece) MarkNotComplete() error { - fs.completion.Set(fs.pieceKey(), false) - return nil -} diff --git a/storage/interface.go b/storage/interface.go index 80182c96..307ba893 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -31,5 +31,10 @@ type PieceImpl interface { MarkComplete() error MarkNotComplete() error // Returns true if the piece is complete. - GetIsComplete() bool + Completion() Completion +} + +type Completion struct { + Complete bool + Ok bool } diff --git a/storage/issue95_test.go b/storage/issue95_test.go index bb2c2fe9..d47410da 100644 --- a/storage/issue95_test.go +++ b/storage/issue95_test.go @@ -29,7 +29,7 @@ func testIssue95(t *testing.T, c ClientImpl) { require.NoError(t, err) t2p := t2.Piece(i2.Piece(0)) assert.NoError(t, t1.Close()) - assert.NotPanics(t, func() { t2p.GetIsComplete() }) + assert.NotPanics(t, func() { t2p.Completion() }) } func TestIssue95File(t *testing.T) { diff --git a/storage/issue96_test.go b/storage/issue96_test.go index b7267ba4..1bdbbf15 100644 --- a/storage/issue96_test.go +++ b/storage/issue96_test.go @@ -27,7 +27,7 @@ func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImpl) n, err := p.ReadAt(make([]byte, 1), 0) require.Error(t, err) require.EqualValues(t, 0, n) - require.False(t, p.GetIsComplete()) + require.False(t, p.Completion().Complete) } func TestMarkedCompleteMissingOnReadFile(t *testing.T) { diff --git a/storage/mmap.go b/storage/mmap.go index fec9d117..0dc512d4 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -76,9 +76,9 @@ func (me mmapStoragePiece) pieceKey() metainfo.PieceKey { return metainfo.PieceKey{me.ih, me.p.Index()} } -func (sp mmapStoragePiece) GetIsComplete() (ret bool) { - ret, _ = sp.pc.Get(sp.pieceKey()) - return +func (sp mmapStoragePiece) Completion() Completion { + c, _ := sp.pc.Get(sp.pieceKey()) + return c } func (sp mmapStoragePiece) MarkComplete() error { diff --git a/storage/piece_resource.go b/storage/piece_resource.go index e46d923e..1064fae5 100644 --- a/storage/piece_resource.go +++ b/storage/piece_resource.go @@ -48,9 +48,12 @@ type piecePerResourcePiece struct { i resource.Instance } -func (s piecePerResourcePiece) GetIsComplete() bool { +func (s piecePerResourcePiece) Completion() Completion { fi, err := s.c.Stat() - return err == nil && fi.Size() == s.p.Length() + return Completion{ + Complete: err == nil && fi.Size() == s.p.Length(), + Ok: true, + } } func (s piecePerResourcePiece) MarkComplete() error { @@ -62,7 +65,7 @@ func (s piecePerResourcePiece) MarkNotComplete() error { } func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) { - if s.GetIsComplete() { + if s.Completion().Complete { return s.c.ReadAt(b, off) } else { return s.i.ReadAt(b, off) diff --git a/storage/wrappers.go b/storage/wrappers.go index b4137e5b..b8ba050b 100644 --- a/storage/wrappers.go +++ b/storage/wrappers.go @@ -37,8 +37,9 @@ type Piece struct { } func (p Piece) WriteAt(b []byte, off int64) (n int, err error) { - if p.GetIsComplete() { - err = errors.New("piece completed") + c := p.Completion() + if c.Ok && c.Complete { + err = errors.New("piece already completed") return } if off+int64(len(b)) > p.mip.Length() { diff --git a/torrent.go b/torrent.go index 3374778b..03713580 100644 --- a/torrent.go +++ b/torrent.go @@ -112,6 +112,8 @@ type Torrent struct { pendingPieces bitmap.Bitmap // A cache of completed piece indices. completedPieces bitmap.Bitmap + // Pieces that need to be hashed. + piecesQueuedForHash bitmap.Bitmap // A pool of piece priorities []int for assignment to new connections. // These "inclinations" are used to give connections preference for @@ -190,8 +192,8 @@ func (t *Torrent) pieceComplete(piece int) bool { return t.completedPieces.Get(piece) } -func (t *Torrent) pieceCompleteUncached(piece int) bool { - return t.pieces[piece].Storage().GetIsComplete() +func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion { + return t.pieces[piece].Storage().Completion() } // There's a connection to that address already. @@ -332,13 +334,12 @@ func (t *Torrent) setInfoBytes(b []byte) error { } for i := range t.pieces { t.updatePieceCompletion(i) - // t.pieces[i].QueuedForHash = true + p := &t.pieces[i] + if !p.storageCompletionOk { + log.Printf("piece %s completion unknown, queueing check", p) + t.queuePieceCheck(i) + } } - // go func() { - // for i := range t.pieces { - // t.verifyPiece(i) - // } - // }() return nil } @@ -392,7 +393,7 @@ func (t *Torrent) pieceState(index int) (ret PieceState) { if t.pieceComplete(index) { ret.Complete = true } - if p.queuedForHash || p.hashing { + if p.queuedForHash() || p.hashing { ret.Checking = true } if !ret.Complete && t.piecePartiallyDownloaded(index) { @@ -738,7 +739,7 @@ func (t *Torrent) wantPieceIndex(index int) bool { return false } p := &t.pieces[index] - if p.queuedForHash { + if p.queuedForHash() { return false } if p.hashing { @@ -1005,8 +1006,10 @@ func (t *Torrent) putPieceInclination(pi []int) { func (t *Torrent) updatePieceCompletion(piece int) { pcu := t.pieceCompleteUncached(piece) - changed := t.completedPieces.Get(piece) != pcu - t.completedPieces.Set(piece, pcu) + p := &t.pieces[piece] + changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok + p.storageCompletionOk = pcu.Ok + t.completedPieces.Set(piece, pcu.Complete) if changed { t.pieceCompletionChanged(piece) } @@ -1517,12 +1520,19 @@ func (t *Torrent) verifyPiece(piece int) { cl.mu.Lock() defer cl.mu.Unlock() p := &t.pieces[piece] + defer func() { + p.numVerifies++ + cl.event.Broadcast() + }() for p.hashing || t.storage == nil { cl.event.Wait() } - p.queuedForHash = false + if !p.t.piecesQueuedForHash.Remove(piece) { + panic("piece was not queued") + } if t.closed.IsSet() || t.pieceComplete(piece) { t.updatePiecePriority(piece) + log.Println("early return", t.closed.IsSet(), t.pieceComplete(piece)) return } p.hashing = true @@ -1530,7 +1540,6 @@ func (t *Torrent) verifyPiece(piece int) { cl.mu.Unlock() sum := t.hashPiece(piece) cl.mu.Lock() - p.numVerifies++ p.hashing = false t.pieceHashed(piece, sum == p.hash) } @@ -1557,10 +1566,10 @@ func (t *Torrent) connsAsSlice() (ret []*connection) { // Currently doesn't really queue, but should in the future. func (t *Torrent) queuePieceCheck(pieceIndex int) { piece := &t.pieces[pieceIndex] - if piece.queuedForHash { + if piece.queuedForHash() { return } - piece.queuedForHash = true + t.piecesQueuedForHash.Add(pieceIndex) t.publishPieceChange(pieceIndex) go t.verifyPiece(pieceIndex) }