]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add Piece.VerifyDataContext and improve piece VerifyData concurrency
authorMatt Joiner <anacrolix@gmail.com>
Wed, 9 Apr 2025 02:27:19 +0000 (12:27 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 9 Apr 2025 02:27:19 +0000 (12:27 +1000)
go.mod
go.sum
piece.go
torrent.go

diff --git a/go.mod b/go.mod
index 5597848a2543f180dd0bd00ef4fe7056f0ab8a5e..cbd06831da353ca4031fbc4a618b53f1064c807c 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@ require (
        github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0
        github.com/alexflint/go-arg v1.4.3
        github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d
-       github.com/anacrolix/chansync v0.5.1
+       github.com/anacrolix/chansync v0.6.0
        github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444
        github.com/anacrolix/envpprof v1.3.0
        github.com/anacrolix/fuse v0.2.0
diff --git a/go.sum b/go.sum
index 96aec451c04a7a48bc58dc7c60afa81804143964..882fee6be223de3016c2dd2bd8eb28ae2356854d 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -70,6 +70,8 @@ github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d h1:ypNOsIwvdumNRl
 github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d/go.mod h1:9xUiZbkh+94FbiIAL1HXpAIBa832f3Mp07rRPl5c5RQ=
 github.com/anacrolix/chansync v0.5.1 h1:j+R9DtotkXm40VFjZ8rJTSJkg2Gv1ldZt8kl96lyJJ0=
 github.com/anacrolix/chansync v0.5.1/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k=
+github.com/anacrolix/chansync v0.6.0 h1:/aQVvZ1yLRhmqEYrr9dC92JwzNBQ/SNnFi4uk+fTkQY=
+github.com/anacrolix/chansync v0.6.0/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k=
 github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444 h1:8V0K09lrGoeT2KRJNOtspA7q+OMxGwQqK/Ug0IiaaRE=
 github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444/go.mod h1:MctKM1HS5YYDb3F30NGJxLE+QPuqWoT5ReW/4jt8xew=
 github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c=
index 7dd3f61e309e59f0f1cce0f299f4db267eca59b5..66ae2b0c862a98a8bf0b91d553607c58bf858181 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -1,6 +1,7 @@
 package torrent
 
 import (
+       "context"
        "errors"
        "fmt"
        "sync"
@@ -15,6 +16,9 @@ import (
        "github.com/anacrolix/torrent/storage"
 )
 
+// Why is it an int64?
+type pieceVerifyCount = int64
+
 type Piece struct {
        // The completed piece SHA1 hash, from the metainfo "pieces" field. Nil if the info is not V1
        // compatible.
@@ -26,7 +30,8 @@ type Piece struct {
 
        readerCond chansync.BroadcastCond
 
-       numVerifies         int64
+       numVerifies         pieceVerifyCount
+       numVerifiesCond     chansync.BroadcastCond
        hashing             bool
        marking             bool
        storageCompletionOk bool
@@ -177,23 +182,40 @@ func (p *Piece) bytesLeft() (ret pp.Integer) {
 }
 
 // Forces the piece data to be rehashed.
-func (p *Piece) VerifyData() {
-       p.t.cl.lock()
-       defer p.t.cl.unlock()
-       target := p.numVerifies + 1
-       if p.hashing {
-               target++
+func (p *Piece) VerifyData() error {
+       return p.VerifyDataContext(context.Background())
+}
+
+// Forces the piece data to be rehashed. This might be a temporary method until
+// an event-based one is created. Possibly this blocking style is more suited to
+// external control of hashing concurrency.
+func (p *Piece) VerifyDataContext(ctx context.Context) error {
+       locker := p.t.cl.locker()
+       locker.Lock()
+       target, err := p.t.queuePieceCheck(p.index)
+       locker.Unlock()
+       if err != nil {
+               return err
        }
-       // log.Printf("target: %d", target)
-       p.t.queuePieceCheck(p.index)
+       //log.Printf("target: %d", target)
        for {
-               // log.Printf("got %d verifies", p.numVerifies)
-               if p.numVerifies >= target {
-                       break
+               locker.RLock()
+               done := p.numVerifies >= target
+               //log.Printf("got %d verifies", p.numVerifies)
+               numVerifiesChanged := p.numVerifiesCond.Signaled()
+               locker.RUnlock()
+               if done {
+                       //log.Print("done")
+                       return nil
+               }
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-p.t.closed.Done():
+                       return errTorrentClosed
+               case <-numVerifiesChanged:
                }
-               p.t.cl.event.Wait()
        }
-       // log.Print("done")
 }
 
 func (p *Piece) queuedForHash() bool {
index 3154662d19b5de9e2b44aea42c1b4c6f9277d340..f8cfec77c1068bdc74baee7e52a835826133b6ae 100644 (file)
@@ -1369,7 +1369,7 @@ func (t *Torrent) publishPieceStateChange(piece pieceIndex) {
                if cur != p.publicPieceState {
                        p.publicPieceState = cur
                        t.pieceStateChanges.Publish(PieceStateChange{
-                               int(piece),
+                               piece,
                                cur,
                        })
                }
@@ -2384,6 +2384,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
        })
        p := t.piece(piece)
        p.numVerifies++
+       p.numVerifiesCond.Broadcast()
        t.cl.event.Broadcast()
        if t.closed.IsSet() {
                return
@@ -2536,9 +2537,13 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
        })
 }
 
-func (t *Torrent) tryCreateMorePieceHashers() {
-       for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
+func (t *Torrent) tryCreateMorePieceHashers() error {
+       if t.closed.IsSet() {
+               return errTorrentClosed
        }
+       for t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
+       }
+       return nil
 }
 
 func (t *Torrent) tryCreatePieceHasher() bool {
@@ -2649,10 +2654,15 @@ func (t *Torrent) queueInitialPieceCheck(i pieceIndex) {
        }
 }
 
-func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
+func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVerifyCount, err error) {
        piece := t.piece(pieceIndex)
        if !piece.haveHash() {
-               return
+               err = errors.New("piece hash unknown")
+       }
+       targetVerifies = piece.numVerifies + 1
+       if piece.hashing {
+               // The result of this queued piece check will be the one after the current one.
+               targetVerifies++
        }
        if piece.queuedForHash() {
                return
@@ -2660,13 +2670,14 @@ func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
        t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
        t.publishPieceStateChange(pieceIndex)
        t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
-       t.tryCreateMorePieceHashers()
+       err = t.tryCreateMorePieceHashers()
+       return
 }
 
 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
 // before the Info is available.
 func (t *Torrent) VerifyData() {
-       for i := pieceIndex(0); i < t.NumPieces(); i++ {
+       for i := 0; i < t.NumPieces(); i++ {
                t.Piece(i).VerifyData()
        }
 }