github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0
github.com/alexflint/go-arg v1.4.3
github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d
- github.com/anacrolix/chansync v0.5.1
+ github.com/anacrolix/chansync v0.6.0
github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444
github.com/anacrolix/envpprof v1.3.0
github.com/anacrolix/fuse v0.2.0
github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d/go.mod h1:9xUiZbkh+94FbiIAL1HXpAIBa832f3Mp07rRPl5c5RQ=
github.com/anacrolix/chansync v0.5.1 h1:j+R9DtotkXm40VFjZ8rJTSJkg2Gv1ldZt8kl96lyJJ0=
github.com/anacrolix/chansync v0.5.1/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k=
+github.com/anacrolix/chansync v0.6.0 h1:/aQVvZ1yLRhmqEYrr9dC92JwzNBQ/SNnFi4uk+fTkQY=
+github.com/anacrolix/chansync v0.6.0/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k=
github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444 h1:8V0K09lrGoeT2KRJNOtspA7q+OMxGwQqK/Ug0IiaaRE=
github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444/go.mod h1:MctKM1HS5YYDb3F30NGJxLE+QPuqWoT5ReW/4jt8xew=
github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c=
package torrent
import (
+ "context"
"errors"
"fmt"
"sync"
"github.com/anacrolix/torrent/storage"
)
+// Why is it an int64?
+type pieceVerifyCount = int64
+
type Piece struct {
// The completed piece SHA1 hash, from the metainfo "pieces" field. Nil if the info is not V1
// compatible.
readerCond chansync.BroadcastCond
- numVerifies int64
+ numVerifies pieceVerifyCount
+ numVerifiesCond chansync.BroadcastCond
hashing bool
marking bool
storageCompletionOk bool
}
// Forces the piece data to be rehashed.
-func (p *Piece) VerifyData() {
- p.t.cl.lock()
- defer p.t.cl.unlock()
- target := p.numVerifies + 1
- if p.hashing {
- target++
+func (p *Piece) VerifyData() error {
+ return p.VerifyDataContext(context.Background())
+}
+
+// Forces the piece data to be rehashed. This might be a temporary method until
+// an event-based one is created. Possibly this blocking style is more suited to
+// external control of hashing concurrency.
+func (p *Piece) VerifyDataContext(ctx context.Context) error {
+ locker := p.t.cl.locker()
+ locker.Lock()
+ target, err := p.t.queuePieceCheck(p.index)
+ locker.Unlock()
+ if err != nil {
+ return err
}
- // log.Printf("target: %d", target)
- p.t.queuePieceCheck(p.index)
+ //log.Printf("target: %d", target)
for {
- // log.Printf("got %d verifies", p.numVerifies)
- if p.numVerifies >= target {
- break
+ locker.RLock()
+ done := p.numVerifies >= target
+ //log.Printf("got %d verifies", p.numVerifies)
+ numVerifiesChanged := p.numVerifiesCond.Signaled()
+ locker.RUnlock()
+ if done {
+ //log.Print("done")
+ return nil
+ }
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-p.t.closed.Done():
+ return errTorrentClosed
+ case <-numVerifiesChanged:
}
- p.t.cl.event.Wait()
}
- // log.Print("done")
}
func (p *Piece) queuedForHash() bool {
if cur != p.publicPieceState {
p.publicPieceState = cur
t.pieceStateChanges.Publish(PieceStateChange{
- int(piece),
+ piece,
cur,
})
}
})
p := t.piece(piece)
p.numVerifies++
+ p.numVerifiesCond.Broadcast()
t.cl.event.Broadcast()
if t.closed.IsSet() {
return
})
}
-func (t *Torrent) tryCreateMorePieceHashers() {
- for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
+func (t *Torrent) tryCreateMorePieceHashers() error {
+ if t.closed.IsSet() {
+ return errTorrentClosed
}
+ for t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
+ }
+ return nil
}
func (t *Torrent) tryCreatePieceHasher() bool {
}
}
-func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
+func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVerifyCount, err error) {
piece := t.piece(pieceIndex)
if !piece.haveHash() {
- return
+ err = errors.New("piece hash unknown")
+ }
+ targetVerifies = piece.numVerifies + 1
+ if piece.hashing {
+ // The result of this queued piece check will be the one after the current one.
+ targetVerifies++
}
if piece.queuedForHash() {
return
t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
t.publishPieceStateChange(pieceIndex)
t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
- t.tryCreateMorePieceHashers()
+ err = t.tryCreateMorePieceHashers()
+ return
}
// Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
// before the Info is available.
func (t *Torrent) VerifyData() {
- for i := pieceIndex(0); i < t.NumPieces(); i++ {
+ for i := 0; i < t.NumPieces(); i++ {
t.Piece(i).VerifyData()
}
}