From 4f69c80d618ad69ced9f27713eb064e6b2df89ba Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 9 Apr 2025 12:27:19 +1000 Subject: [PATCH] Add Piece.VerifyDataContext and improve piece VerifyData concurrency --- go.mod | 2 +- go.sum | 2 ++ piece.go | 50 ++++++++++++++++++++++++++++++++++++-------------- torrent.go | 25 ++++++++++++++++++------- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 5597848a..cbd06831 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 github.com/alexflint/go-arg v1.4.3 github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d - github.com/anacrolix/chansync v0.5.1 + github.com/anacrolix/chansync v0.6.0 github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444 github.com/anacrolix/envpprof v1.3.0 github.com/anacrolix/fuse v0.2.0 diff --git a/go.sum b/go.sum index 96aec451..882fee6b 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d h1:ypNOsIwvdumNRl github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d/go.mod h1:9xUiZbkh+94FbiIAL1HXpAIBa832f3Mp07rRPl5c5RQ= github.com/anacrolix/chansync v0.5.1 h1:j+R9DtotkXm40VFjZ8rJTSJkg2Gv1ldZt8kl96lyJJ0= github.com/anacrolix/chansync v0.5.1/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= +github.com/anacrolix/chansync v0.6.0 h1:/aQVvZ1yLRhmqEYrr9dC92JwzNBQ/SNnFi4uk+fTkQY= +github.com/anacrolix/chansync v0.6.0/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444 h1:8V0K09lrGoeT2KRJNOtspA7q+OMxGwQqK/Ug0IiaaRE= github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444/go.mod h1:MctKM1HS5YYDb3F30NGJxLE+QPuqWoT5ReW/4jt8xew= github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c= diff --git a/piece.go b/piece.go index 7dd3f61e..66ae2b0c 100644 --- a/piece.go +++ b/piece.go @@ -1,6 +1,7 @@ package torrent import ( + "context" "errors" "fmt" "sync" @@ -15,6 +16,9 @@ import ( "github.com/anacrolix/torrent/storage" ) +// Why is it an int64? +type pieceVerifyCount = int64 + type Piece struct { // The completed piece SHA1 hash, from the metainfo "pieces" field. Nil if the info is not V1 // compatible. @@ -26,7 +30,8 @@ type Piece struct { readerCond chansync.BroadcastCond - numVerifies int64 + numVerifies pieceVerifyCount + numVerifiesCond chansync.BroadcastCond hashing bool marking bool storageCompletionOk bool @@ -177,23 +182,40 @@ func (p *Piece) bytesLeft() (ret pp.Integer) { } // Forces the piece data to be rehashed. -func (p *Piece) VerifyData() { - p.t.cl.lock() - defer p.t.cl.unlock() - target := p.numVerifies + 1 - if p.hashing { - target++ +func (p *Piece) VerifyData() error { + return p.VerifyDataContext(context.Background()) +} + +// Forces the piece data to be rehashed. This might be a temporary method until +// an event-based one is created. Possibly this blocking style is more suited to +// external control of hashing concurrency. +func (p *Piece) VerifyDataContext(ctx context.Context) error { + locker := p.t.cl.locker() + locker.Lock() + target, err := p.t.queuePieceCheck(p.index) + locker.Unlock() + if err != nil { + return err } - // log.Printf("target: %d", target) - p.t.queuePieceCheck(p.index) + //log.Printf("target: %d", target) for { - // log.Printf("got %d verifies", p.numVerifies) - if p.numVerifies >= target { - break + locker.RLock() + done := p.numVerifies >= target + //log.Printf("got %d verifies", p.numVerifies) + numVerifiesChanged := p.numVerifiesCond.Signaled() + locker.RUnlock() + if done { + //log.Print("done") + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.t.closed.Done(): + return errTorrentClosed + case <-numVerifiesChanged: } - p.t.cl.event.Wait() } - // log.Print("done") } func (p *Piece) queuedForHash() bool { diff --git a/torrent.go b/torrent.go index 3154662d..f8cfec77 100644 --- a/torrent.go +++ b/torrent.go @@ -1369,7 +1369,7 @@ func (t *Torrent) publishPieceStateChange(piece pieceIndex) { if cur != p.publicPieceState { p.publicPieceState = cur t.pieceStateChanges.Publish(PieceStateChange{ - int(piece), + piece, cur, }) } @@ -2384,6 +2384,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { }) p := t.piece(piece) p.numVerifies++ + p.numVerifiesCond.Broadcast() t.cl.event.Broadcast() if t.closed.IsSet() { return @@ -2536,9 +2537,13 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { }) } -func (t *Torrent) tryCreateMorePieceHashers() { - for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() { +func (t *Torrent) tryCreateMorePieceHashers() error { + if t.closed.IsSet() { + return errTorrentClosed } + for t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() { + } + return nil } func (t *Torrent) tryCreatePieceHasher() bool { @@ -2649,10 +2654,15 @@ func (t *Torrent) queueInitialPieceCheck(i pieceIndex) { } } -func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) { +func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVerifyCount, err error) { piece := t.piece(pieceIndex) if !piece.haveHash() { - return + err = errors.New("piece hash unknown") + } + targetVerifies = piece.numVerifies + 1 + if piece.hashing { + // The result of this queued piece check will be the one after the current one. + targetVerifies++ } if piece.queuedForHash() { return @@ -2660,13 +2670,14 @@ func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) { t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex)) t.publishPieceStateChange(pieceIndex) t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck") - t.tryCreateMorePieceHashers() + err = t.tryCreateMorePieceHashers() + return } // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called // before the Info is available. func (t *Torrent) VerifyData() { - for i := pieceIndex(0); i < t.NumPieces(); i++ { + for i := 0; i < t.NumPieces(); i++ { t.Piece(i).VerifyData() } } -- 2.48.1