]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rearrange hashing so goroutines are reused
authorMatt Joiner <anacrolix@gmail.com>
Fri, 23 May 2025 04:38:50 +0000 (14:38 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 23 May 2025 04:38:50 +0000 (14:38 +1000)
piece.go
torrent.go

index 0b9e51cb85a8ba43c1f631e0f8285e000587fd9e..78f86b15fbb5bbe08c299061f695e05171aa026a 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -227,7 +227,7 @@ func (p *Piece) VerifyDataContext(ctx context.Context) error {
 }
 
 func (p *Piece) queuedForHash() bool {
-       return p.t.piecesQueuedForHash.Get(bitmap.BitIndex(p.index))
+       return p.t.piecesQueuedForHash.Contains(p.index)
 }
 
 func (p *Piece) torrentBeginOffset() int64 {
index 5793aa460bb7246b81060fc0324b176e54e0f353..b2c5041db6d14d0b4f71114067879a54f654198d 100644 (file)
@@ -163,7 +163,7 @@ type Torrent struct {
        // A cache of completed piece indices.
        _completedPieces roaring.Bitmap
        // Pieces that need to be hashed.
-       piecesQueuedForHash bitmap.Bitmap
+       piecesQueuedForHash typedRoaring.Bitmap[pieceIndex]
        activePieceHashes   int
 
        connsWithAllPieces map[*Peer]struct{}
@@ -568,7 +568,7 @@ func (t *Torrent) onSetInfo() {
        close(t.gotMetainfoC)
        t.updateWantPeersEvent()
        t.requestState = make(map[RequestIndex]requestState)
-       t.tryCreateMorePieceHashers()
+       panicif.Err(t.startPieceHashers())
        t.iterPeers(func(p *Peer) {
                p.onGotInfo(t.info)
                p.onNeedUpdateRequests("onSetInfo")
@@ -1195,7 +1195,7 @@ func (t *Torrent) hashPiece(piece pieceIndex) (
                        }
                        correct = sum == *p.hash
                        // Can't do smart banning without reading the piece. The smartBanCache is still cleared
-                       // in pieceHasher regardless.
+                       // in finishHash regardless.
                        return
                }
                h := pieceHash.New()
@@ -2612,44 +2612,64 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
        })
 }
 
-func (t *Torrent) tryCreateMorePieceHashers() error {
+func (t *Torrent) startPieceHashers() error {
        if t.closed.IsSet() {
                return errTorrentClosed
        }
-       for t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
+       for t.startPieceHasher() {
        }
        return nil
 }
 
-func (t *Torrent) tryCreatePieceHasher() bool {
+func (t *Torrent) startPieceHasher() bool {
        if t.storage == nil {
                return false
        }
-       pi, ok := t.getPieceToHash()
-       if !ok {
+       if t.activePieceHashes >= t.cl.config.PieceHashersPerTorrent {
                return false
        }
+       pi := t.getPieceToHash()
+       if pi.Ok {
+               t.startHash(pi.Value)
+               go t.pieceHasher(pi.Value)
+               return true
+       }
+       return false
+}
+
+func (t *Torrent) pieceHasher(initial pieceIndex) {
+       t.finishHash(initial)
+       for {
+               piOpt := t.getPieceToHash()
+               if !piOpt.Ok {
+                       break
+               }
+               pi := piOpt.Value
+               t.startHash(pi)
+               t.cl.unlock()
+               t.finishHash(pi)
+       }
+       t.cl.unlock()
+}
+
+func (t *Torrent) startHash(pi pieceIndex) {
        p := t.piece(pi)
-       t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
+       t.piecesQueuedForHash.Remove(pi)
        t.deferUpdateComplete()
        p.hashing = true
        t.publishPieceStateChange(pi)
-       t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
+       t.updatePiecePriority(pi, "Torrent.startPieceHasher")
        t.storageLock.RLock()
        t.activePieceHashes++
-       go t.pieceHasher(pi)
-       return true
 }
 
-func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
-       t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
+func (t *Torrent) getPieceToHash() (_ g.Option[pieceIndex]) {
+       for i := range t.piecesQueuedForHash.Iterate {
                if t.piece(i).hashing {
-                       return true
+                       continue
                }
-               ret = i
-               ok = true
-               return false
-       })
+               return g.Some(i)
+       }
        return
 }
 
@@ -2676,7 +2696,9 @@ func (t *Torrent) dropBannedPeers() {
        })
 }
 
-func (t *Torrent) pieceHasher(index pieceIndex) {
+// Storage lock is held. Release storage lock after we're done reading and relock Client. Return
+// with Client lock still held.
+func (t *Torrent) finishHash(index pieceIndex) {
        p := t.piece(index)
        // Do we really need to spell out that it's a copy error? If it's a failure to hash the hash
        // will just be wrong.
@@ -2690,7 +2712,6 @@ func (t *Torrent) pieceHasher(index pieceIndex) {
        }
        t.storageLock.RUnlock()
        t.cl.lock()
-       defer t.cl.unlock()
        if correct {
                for peer := range failedPeers {
                        t.cl.banPeerIP(peer.AsSlice())
@@ -2703,9 +2724,8 @@ func (t *Torrent) pieceHasher(index pieceIndex) {
        }
        p.hashing = false
        t.pieceHashed(index, correct, copyErr)
-       t.updatePiecePriority(index, "Torrent.pieceHasher")
+       t.updatePiecePriority(index, "Torrent.finishHash")
        t.activePieceHashes--
-       t.tryCreateMorePieceHashers()
 }
 
 // Return the connections that touched a piece, and clear the entries while doing it.
@@ -2748,11 +2768,11 @@ func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVe
        if piece.queuedForHash() {
                return
        }
-       t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
+       t.piecesQueuedForHash.Add(pieceIndex)
        t.deferUpdateComplete()
        t.publishPieceStateChange(pieceIndex)
        t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
-       err = t.tryCreateMorePieceHashers()
+       err = t.startPieceHashers()
        return
 }
 
@@ -2864,7 +2884,7 @@ func (t *Torrent) hashingPiece(i pieceIndex) bool {
 }
 
 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
-       return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
+       return t.piecesQueuedForHash.Contains(i)
 }
 
 func (t *Torrent) dialTimeout() time.Duration {