completedPieces bitmap.Bitmap
// Pieces that need to be hashed.
piecesQueuedForHash bitmap.Bitmap
+ activePieceHashes int
// A pool of piece priorities []int for assignment to new connections.
// These "inclinations" are used to give connections preference for
t.updateWantPeersEvent()
t.pendingRequests = make(map[request]int)
t.lastRequested = make(map[request]*time.Timer)
+ t.tryCreateMorePieceHashers()
}
// Called when metadata for a torrent becomes available.
}
func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
- log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
+ t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, correct).WithValues(debugLogValue))
+ p := t.piece(piece)
+ p.numVerifies++
+ t.cl.event.Broadcast()
if t.closed.IsSet() {
return
}
- p := &t.pieces[piece]
touchers := t.reapPieceTouchers(piece)
if p.storageCompletionOk {
// Don't score the first time a piece is hashed, it could be an
}
}
-func (t *Torrent) verifyPiece(piece pieceIndex) {
- cl := t.cl
- cl.lock()
- defer cl.unlock()
- p := &t.pieces[piece]
- defer func() {
- p.numVerifies++
- cl.event.Broadcast()
- }()
- for p.hashing || t.storage == nil {
- cl.event.Wait()
+func (t *Torrent) tryCreateMorePieceHashers() {
+ for t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
}
- if !p.t.piecesQueuedForHash.Remove(bitmap.BitIndex(piece)) {
- panic("piece was not queued")
+}
+
+func (t *Torrent) tryCreatePieceHasher() bool {
+ if t.storage == nil {
+ return false
}
- t.updatePiecePriority(piece)
- if t.closed.IsSet() {
- return
+ pi, ok := t.getPieceToHash()
+ if !ok {
+ return false
}
+ p := t.piece(pi)
+ t.piecesQueuedForHash.Remove(pi)
p.hashing = true
- t.publishPieceChange(piece)
- t.updatePiecePriority(piece)
+ t.publishPieceChange(pi)
+ t.updatePiecePriority(pi)
t.storageLock.RLock()
- cl.unlock()
- sum := t.hashPiece(piece)
+ t.activePieceHashes++
+ go t.pieceHasher(pi)
+ return true
+}
+
+func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
+ t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
+ if t.piece(i).hashing {
+ return true
+ }
+ ret = i
+ ok = true
+ return false
+ })
+ return
+}
+
+func (t *Torrent) pieceHasher(index pieceIndex) {
+ p := t.piece(index)
+ sum := t.hashPiece(index)
t.storageLock.RUnlock()
- cl.lock()
+ t.cl.lock()
+ defer t.cl.unlock()
p.hashing = false
- t.updatePiecePriority(piece)
- t.pieceHashed(piece, sum == *p.hash)
- t.publishPieceChange(piece)
+ t.updatePiecePriority(index)
+ t.pieceHashed(index, sum == *p.hash)
+ t.publishPieceChange(index)
+ t.activePieceHashes--
+ t.tryCreateMorePieceHashers()
}
// Return the connections that touched a piece, and clear the entries while
// Currently doesn't really queue, but should in the future.
func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
- piece := &t.pieces[pieceIndex]
+ piece := t.piece(pieceIndex)
if piece.queuedForHash() {
return
}
t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
t.publishPieceChange(pieceIndex)
t.updatePiecePriority(pieceIndex)
- go t.verifyPiece(pieceIndex)
+ t.tryCreateMorePieceHashers()
}
// Forces all the pieces to be re-hashed. See also Piece.VerifyData.