]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Restrict the number of concurrent piece hashes
authorMatt Joiner <anacrolix@gmail.com>
Wed, 21 Aug 2019 03:31:33 +0000 (13:31 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 22 Aug 2019 00:17:06 +0000 (10:17 +1000)
This fixes bad behaviour where running out of file descriptors, and overloading the system with goroutines and concurrent I/O may cause newly started torrents to fail to hash data.

torrent.go

index 02ee63af9d4b21ae1f12ca4124162a70b1b88d81..2fdd6a52bf1d9a4a14fcb400e5164d3ff032437a 100644 (file)
@@ -131,6 +131,7 @@ type Torrent struct {
        completedPieces bitmap.Bitmap
        // Pieces that need to be hashed.
        piecesQueuedForHash bitmap.Bitmap
+       activePieceHashes   int
 
        // A pool of piece priorities []int for assignment to new connections.
        // These "inclinations" are used to give connections preference for
@@ -382,6 +383,7 @@ func (t *Torrent) onSetInfo() {
        t.updateWantPeersEvent()
        t.pendingRequests = make(map[request]int)
        t.lastRequested = make(map[request]*time.Timer)
+       t.tryCreateMorePieceHashers()
 }
 
 // Called when metadata for a torrent becomes available.
@@ -1503,11 +1505,13 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
 }
 
 func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
-       log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
+       t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, correct).WithValues(debugLogValue))
+       p := t.piece(piece)
+       p.numVerifies++
+       t.cl.event.Broadcast()
        if t.closed.IsSet() {
                return
        }
-       p := &t.pieces[piece]
        touchers := t.reapPieceTouchers(piece)
        if p.storageCompletionOk {
                // Don't score the first time a piece is hashed, it could be an
@@ -1602,37 +1606,54 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
        }
 }
 
-func (t *Torrent) verifyPiece(piece pieceIndex) {
-       cl := t.cl
-       cl.lock()
-       defer cl.unlock()
-       p := &t.pieces[piece]
-       defer func() {
-               p.numVerifies++
-               cl.event.Broadcast()
-       }()
-       for p.hashing || t.storage == nil {
-               cl.event.Wait()
+func (t *Torrent) tryCreateMorePieceHashers() {
+       for t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
        }
-       if !p.t.piecesQueuedForHash.Remove(bitmap.BitIndex(piece)) {
-               panic("piece was not queued")
+}
+
+func (t *Torrent) tryCreatePieceHasher() bool {
+       if t.storage == nil {
+               return false
        }
-       t.updatePiecePriority(piece)
-       if t.closed.IsSet() {
-               return
+       pi, ok := t.getPieceToHash()
+       if !ok {
+               return false
        }
+       p := t.piece(pi)
+       t.piecesQueuedForHash.Remove(pi)
        p.hashing = true
-       t.publishPieceChange(piece)
-       t.updatePiecePriority(piece)
+       t.publishPieceChange(pi)
+       t.updatePiecePriority(pi)
        t.storageLock.RLock()
-       cl.unlock()
-       sum := t.hashPiece(piece)
+       t.activePieceHashes++
+       go t.pieceHasher(pi)
+       return true
+}
+
+func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
+       t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
+               if t.piece(i).hashing {
+                       return true
+               }
+               ret = i
+               ok = true
+               return false
+       })
+       return
+}
+
+func (t *Torrent) pieceHasher(index pieceIndex) {
+       p := t.piece(index)
+       sum := t.hashPiece(index)
        t.storageLock.RUnlock()
-       cl.lock()
+       t.cl.lock()
+       defer t.cl.unlock()
        p.hashing = false
-       t.updatePiecePriority(piece)
-       t.pieceHashed(piece, sum == *p.hash)
-       t.publishPieceChange(piece)
+       t.updatePiecePriority(index)
+       t.pieceHashed(index, sum == *p.hash)
+       t.publishPieceChange(index)
+       t.activePieceHashes--
+       t.tryCreateMorePieceHashers()
 }
 
 // Return the connections that touched a piece, and clear the entries while
@@ -1655,14 +1676,14 @@ func (t *Torrent) connsAsSlice() (ret []*connection) {
 
 // Currently doesn't really queue, but should in the future.
 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
-       piece := &t.pieces[pieceIndex]
+       piece := t.piece(pieceIndex)
        if piece.queuedForHash() {
                return
        }
        t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
        t.publishPieceChange(pieceIndex)
        t.updatePiecePriority(pieceIndex)
-       go t.verifyPiece(pieceIndex)
+       t.tryCreateMorePieceHashers()
 }
 
 // Forces all the pieces to be re-hashed. See also Piece.VerifyData.