From 4850ce6ab330babdb25af4c70cfe72c76b4321fb Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 21 Aug 2019 13:31:33 +1000 Subject: [PATCH] Restrict the number of concurrent piece hashes This fixes bad behaviour where running out of file descriptors, and overloading the system with goroutines and concurrent I/O may cause newly started torrents to fail to hash data. --- torrent.go | 77 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/torrent.go b/torrent.go index 02ee63af..2fdd6a52 100644 --- a/torrent.go +++ b/torrent.go @@ -131,6 +131,7 @@ type Torrent struct { completedPieces bitmap.Bitmap // Pieces that need to be hashed. piecesQueuedForHash bitmap.Bitmap + activePieceHashes int // A pool of piece priorities []int for assignment to new connections. // These "inclinations" are used to give connections preference for @@ -382,6 +383,7 @@ func (t *Torrent) onSetInfo() { t.updateWantPeersEvent() t.pendingRequests = make(map[request]int) t.lastRequested = make(map[request]*time.Timer) + t.tryCreateMorePieceHashers() } // Called when metadata for a torrent becomes available. @@ -1503,11 +1505,13 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { } func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) { - log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger) + t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, correct).WithValues(debugLogValue)) + p := t.piece(piece) + p.numVerifies++ + t.cl.event.Broadcast() if t.closed.IsSet() { return } - p := &t.pieces[piece] touchers := t.reapPieceTouchers(piece) if p.storageCompletionOk { // Don't score the first time a piece is hashed, it could be an @@ -1602,37 +1606,54 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { } } -func (t *Torrent) verifyPiece(piece pieceIndex) { - cl := t.cl - cl.lock() - defer cl.unlock() - p := &t.pieces[piece] - defer func() { - p.numVerifies++ - cl.event.Broadcast() - }() - for p.hashing || t.storage == nil { - cl.event.Wait() +func (t *Torrent) tryCreateMorePieceHashers() { + for t.activePieceHashes < 2 && t.tryCreatePieceHasher() { } - if !p.t.piecesQueuedForHash.Remove(bitmap.BitIndex(piece)) { - panic("piece was not queued") +} + +func (t *Torrent) tryCreatePieceHasher() bool { + if t.storage == nil { + return false } - t.updatePiecePriority(piece) - if t.closed.IsSet() { - return + pi, ok := t.getPieceToHash() + if !ok { + return false } + p := t.piece(pi) + t.piecesQueuedForHash.Remove(pi) p.hashing = true - t.publishPieceChange(piece) - t.updatePiecePriority(piece) + t.publishPieceChange(pi) + t.updatePiecePriority(pi) t.storageLock.RLock() - cl.unlock() - sum := t.hashPiece(piece) + t.activePieceHashes++ + go t.pieceHasher(pi) + return true +} + +func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) { + t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool { + if t.piece(i).hashing { + return true + } + ret = i + ok = true + return false + }) + return +} + +func (t *Torrent) pieceHasher(index pieceIndex) { + p := t.piece(index) + sum := t.hashPiece(index) t.storageLock.RUnlock() - cl.lock() + t.cl.lock() + defer t.cl.unlock() p.hashing = false - t.updatePiecePriority(piece) - t.pieceHashed(piece, sum == *p.hash) - t.publishPieceChange(piece) + t.updatePiecePriority(index) + t.pieceHashed(index, sum == *p.hash) + t.publishPieceChange(index) + t.activePieceHashes-- + t.tryCreateMorePieceHashers() } // Return the connections that touched a piece, and clear the entries while @@ -1655,14 +1676,14 @@ func (t *Torrent) connsAsSlice() (ret []*connection) { // Currently doesn't really queue, but should in the future. func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) { - piece := &t.pieces[pieceIndex] + piece := t.piece(pieceIndex) if piece.queuedForHash() { return } t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex)) t.publishPieceChange(pieceIndex) t.updatePiecePriority(pieceIndex) - go t.verifyPiece(pieceIndex) + t.tryCreateMorePieceHashers() } // Forces all the pieces to be re-hashed. See also Piece.VerifyData. -- 2.48.1