From: Matt Joiner Date: Fri, 23 May 2025 04:38:50 +0000 (+1000) Subject: Rearrange hashing so goroutines are reused X-Git-Tag: v1.59.0~124 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=26f1aa192bc6edd8edd4b91a495ba07ee6a3d695;p=btrtrc.git Rearrange hashing so goroutines are reused --- diff --git a/piece.go b/piece.go index 0b9e51cb..78f86b15 100644 --- a/piece.go +++ b/piece.go @@ -227,7 +227,7 @@ func (p *Piece) VerifyDataContext(ctx context.Context) error { } func (p *Piece) queuedForHash() bool { - return p.t.piecesQueuedForHash.Get(bitmap.BitIndex(p.index)) + return p.t.piecesQueuedForHash.Contains(p.index) } func (p *Piece) torrentBeginOffset() int64 { diff --git a/torrent.go b/torrent.go index 5793aa46..b2c5041d 100644 --- a/torrent.go +++ b/torrent.go @@ -163,7 +163,7 @@ type Torrent struct { // A cache of completed piece indices. _completedPieces roaring.Bitmap // Pieces that need to be hashed. - piecesQueuedForHash bitmap.Bitmap + piecesQueuedForHash typedRoaring.Bitmap[pieceIndex] activePieceHashes int connsWithAllPieces map[*Peer]struct{} @@ -568,7 +568,7 @@ func (t *Torrent) onSetInfo() { close(t.gotMetainfoC) t.updateWantPeersEvent() t.requestState = make(map[RequestIndex]requestState) - t.tryCreateMorePieceHashers() + panicif.Err(t.startPieceHashers()) t.iterPeers(func(p *Peer) { p.onGotInfo(t.info) p.onNeedUpdateRequests("onSetInfo") @@ -1195,7 +1195,7 @@ func (t *Torrent) hashPiece(piece pieceIndex) ( } correct = sum == *p.hash // Can't do smart banning without reading the piece. The smartBanCache is still cleared - // in pieceHasher regardless. + // in finishHash regardless. return } h := pieceHash.New() @@ -2612,44 +2612,64 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { }) } -func (t *Torrent) tryCreateMorePieceHashers() error { +func (t *Torrent) startPieceHashers() error { if t.closed.IsSet() { return errTorrentClosed } - for t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() { + for t.startPieceHasher() { } return nil } -func (t *Torrent) tryCreatePieceHasher() bool { +func (t *Torrent) startPieceHasher() bool { if t.storage == nil { return false } - pi, ok := t.getPieceToHash() - if !ok { + if t.activePieceHashes >= t.cl.config.PieceHashersPerTorrent { return false } + pi := t.getPieceToHash() + if pi.Ok { + t.startHash(pi.Value) + go t.pieceHasher(pi.Value) + return true + } + return false +} + +func (t *Torrent) pieceHasher(initial pieceIndex) { + t.finishHash(initial) + for { + piOpt := t.getPieceToHash() + if !piOpt.Ok { + break + } + pi := piOpt.Value + t.startHash(pi) + t.cl.unlock() + t.finishHash(pi) + } + t.cl.unlock() +} + +func (t *Torrent) startHash(pi pieceIndex) { p := t.piece(pi) - t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi)) + t.piecesQueuedForHash.Remove(pi) t.deferUpdateComplete() p.hashing = true t.publishPieceStateChange(pi) - t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher") + t.updatePiecePriority(pi, "Torrent.startPieceHasher") t.storageLock.RLock() t.activePieceHashes++ - go t.pieceHasher(pi) - return true } -func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) { - t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool { +func (t *Torrent) getPieceToHash() (_ g.Option[pieceIndex]) { + for i := range t.piecesQueuedForHash.Iterate { if t.piece(i).hashing { - return true + continue } - ret = i - ok = true - return false - }) + return g.Some(i) + } return } @@ -2676,7 +2696,9 @@ func (t *Torrent) dropBannedPeers() { }) } -func (t *Torrent) pieceHasher(index pieceIndex) { +// Storage lock is held. Release storage lock after we're done reading and relock Client. Return +// with Client lock still held. +func (t *Torrent) finishHash(index pieceIndex) { p := t.piece(index) // Do we really need to spell out that it's a copy error? If it's a failure to hash the hash // will just be wrong. @@ -2690,7 +2712,6 @@ func (t *Torrent) pieceHasher(index pieceIndex) { } t.storageLock.RUnlock() t.cl.lock() - defer t.cl.unlock() if correct { for peer := range failedPeers { t.cl.banPeerIP(peer.AsSlice()) @@ -2703,9 +2724,8 @@ func (t *Torrent) pieceHasher(index pieceIndex) { } p.hashing = false t.pieceHashed(index, correct, copyErr) - t.updatePiecePriority(index, "Torrent.pieceHasher") + t.updatePiecePriority(index, "Torrent.finishHash") t.activePieceHashes-- - t.tryCreateMorePieceHashers() } // Return the connections that touched a piece, and clear the entries while doing it. @@ -2748,11 +2768,11 @@ func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) (targetVerifies pieceVe if piece.queuedForHash() { return } - t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex)) + t.piecesQueuedForHash.Add(pieceIndex) t.deferUpdateComplete() t.publishPieceStateChange(pieceIndex) t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck") - err = t.tryCreateMorePieceHashers() + err = t.startPieceHashers() return } @@ -2864,7 +2884,7 @@ func (t *Torrent) hashingPiece(i pieceIndex) bool { } func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool { - return t.piecesQueuedForHash.Get(bitmap.BitIndex(i)) + return t.piecesQueuedForHash.Contains(i) } func (t *Torrent) dialTimeout() time.Duration {