// A cache of completed piece indices.
_completedPieces roaring.Bitmap
// Pieces that need to be hashed.
- piecesQueuedForHash bitmap.Bitmap
+ piecesQueuedForHash typedRoaring.Bitmap[pieceIndex]
activePieceHashes int
connsWithAllPieces map[*Peer]struct{}
close(t.gotMetainfoC)
t.updateWantPeersEvent()
t.requestState = make(map[RequestIndex]requestState)
- t.tryCreateMorePieceHashers()
+ panicif.Err(t.startPieceHashers())
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
p.onNeedUpdateRequests("onSetInfo")
}
correct = sum == *p.hash
// Can't do smart banning without reading the piece. The smartBanCache is still cleared
- // in pieceHasher regardless.
+ // in finishHash regardless.
return
}
h := pieceHash.New()
})
}
-func (t *Torrent) tryCreateMorePieceHashers() error {
+func (t *Torrent) startPieceHashers() error {
if t.closed.IsSet() {
return errTorrentClosed
}
- for t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
+ for t.startPieceHasher() {
}
return nil
}
-func (t *Torrent) tryCreatePieceHasher() bool {
+func (t *Torrent) startPieceHasher() bool {
if t.storage == nil {
return false
}
- pi, ok := t.getPieceToHash()
- if !ok {
+ if t.activePieceHashes >= t.cl.config.PieceHashersPerTorrent {
return false
}
+ pi := t.getPieceToHash()
+ if pi.Ok {
+ t.startHash(pi.Value)
+ go t.pieceHasher(pi.Value)
+ return true
+ }
+ return false
+}
+
+func (t *Torrent) pieceHasher(initial pieceIndex) {
+ t.finishHash(initial)
+ for {
+ piOpt := t.getPieceToHash()
+ if !piOpt.Ok {
+ break
+ }
+ pi := piOpt.Value
+ t.startHash(pi)
+ t.cl.unlock()
+ t.finishHash(pi)
+ }
+ t.cl.unlock()
+}
+
+func (t *Torrent) startHash(pi pieceIndex) {
p := t.piece(pi)
- t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
+ t.piecesQueuedForHash.Remove(pi)
t.deferUpdateComplete()
p.hashing = true
t.publishPieceStateChange(pi)
- t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
+ t.updatePiecePriority(pi, "Torrent.startPieceHasher")
t.storageLock.RLock()
t.activePieceHashes++
- go t.pieceHasher(pi)
- return true
}
-func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
- t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
+func (t *Torrent) getPieceToHash() (_ g.Option[pieceIndex]) {
+ for i := range t.piecesQueuedForHash.Iterate {
if t.piece(i).hashing {
- return true
+ continue
}
- ret = i
- ok = true
- return false
- })
+ return g.Some(i)
+ }
return
}
})
}
-func (t *Torrent) pieceHasher(index pieceIndex) {
+// Storage lock is held. Release storage lock after we're done reading and relock Client. Return
+// with Client lock still held.
+func (t *Torrent) finishHash(index pieceIndex) {
p := t.piece(index)
// Do we really need to spell out that it's a copy error? If it's a failure to hash the hash
// will just be wrong.
}
t.storageLock.RUnlock()
t.cl.lock()
- defer t.cl.unlock()
if correct {
for peer := range failedPeers {
t.cl.banPeerIP(peer.AsSlice())
}
p.hashing = false
t.pieceHashed(index, correct, copyErr)
- t.updatePiecePriority(index, "Torrent.pieceHasher")
+ t.updatePiecePriority(index, "Torrent.finishHash")
t.activePieceHashes--
- t.tryCreateMorePieceHashers()
}
// Return the connections that touched a piece, and clear the entries while doing it.
if piece.queuedForHash() {
return
}
- t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
+ t.piecesQueuedForHash.Add(pieceIndex)
t.deferUpdateComplete()
t.publishPieceStateChange(pieceIndex)
t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
- err = t.tryCreateMorePieceHashers()
+ err = t.startPieceHashers()
return
}
}
func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
- return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
+ return t.piecesQueuedForHash.Contains(i)
}
func (t *Torrent) dialTimeout() time.Duration {