]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Limit piece hashers per client
authorMatt Joiner <anacrolix@gmail.com>
Mon, 21 Jul 2025 12:01:38 +0000 (22:01 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 21 Jul 2025 12:01:38 +0000 (22:01 +1000)
client.go
torrent.go

index de598591141ecc97c7f27aff5e3e8091f6e9da3a..b4857a32ac058583d854bf21f88859bffe00b7b9 100644 (file)
--- a/client.go
+++ b/client.go
@@ -16,6 +16,7 @@ import (
        "net"
        "net/http"
        "net/netip"
+       "runtime"
        "slices"
        "strconv"
        "time"
@@ -108,6 +109,8 @@ type Client struct {
        upnpMappings []*upnpMapping
 
        webseedRequestTimer *time.Timer
+
+       activePieceHashers int
 }
 
 type ipStr string
@@ -1614,6 +1617,7 @@ func (cl *Client) Torrents() []*Torrent {
 }
 
 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
+       ret = make([]*Torrent, 0, len(cl.torrents))
        for t := range cl.torrents {
                ret = append(ret, t)
        }
@@ -1941,3 +1945,38 @@ func (cl *Client) checkConfig() error {
        }
        return nil
 }
+
+func (cl *Client) maxActivePieceHashers() int {
+       return runtime.NumCPU()
+}
+
+func (cl *Client) belowMaxActivePieceHashers() bool {
+       return cl.activePieceHashers < cl.maxActivePieceHashers()
+}
+
+func (cl *Client) canStartPieceHashers() bool {
+       return cl.belowMaxActivePieceHashers()
+}
+
+func (cl *Client) startPieceHashers() {
+       if !cl.canStartPieceHashers() {
+               return
+       }
+       ts := make([]*Torrent, 0, len(cl.torrents))
+       for t := range cl.torrents {
+               if !t.considerStartingHashers() {
+                       continue
+               }
+               ts = append(ts, t)
+       }
+       // Sort largest torrents first, as those are preferred by webseeds, and will cause less thrashing.
+       slices.SortFunc(ts, func(a, b *Torrent) int {
+               return -cmp.Compare(a.length(), b.length())
+       })
+       for _, t := range ts {
+               t.startPieceHashers()
+               if !cl.canStartPieceHashers() {
+                       break
+               }
+       }
+}
index bbbbf63459029b3eaf8176696df567dc2c92aac3..3c0ac74550364218a6443c3b6d68dd64e7ce9473 100644 (file)
@@ -2486,9 +2486,6 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
 }
 
 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
-       t.logger.LazyLog(log.Debug, func() log.Msg {
-               return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
-       })
        p := t.piece(piece)
        p.numVerifies++
        p.numVerifiesCond.Broadcast()
@@ -2656,31 +2653,31 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
        })
 }
 
+// Torrent piece hashers are sticky and will try to keep hashing pieces in the same Torrent to keep
+// the storage hot.
 func (t *Torrent) startPieceHashers() error {
        if t.closed.IsSet() {
                return errTorrentClosed
        }
-       for t.startPieceHasher() {
+       for t.considerStartingHashers() {
+               if !t.startSinglePieceHasher() {
+                       break
+               }
        }
        return nil
 }
 
-func (t *Torrent) startPieceHasher() bool {
-       if t.storage == nil {
-               return false
-       }
-       if t.activePieceHashes >= t.cl.config.PieceHashersPerTorrent {
-               return false
-       }
+func (t *Torrent) startSinglePieceHasher() bool {
        pi := t.getPieceToHash()
-       if pi.Ok {
-               t.startHash(pi.Value)
-               go t.pieceHasher(pi.Value)
-               return true
+       if !pi.Ok {
+               return false
        }
-       return false
+       t.startHash(pi.Value)
+       go t.pieceHasher(pi.Value)
+       return true
 }
 
+// Sticky to a Torrent. Might as well since that keeps the storage hot.
 func (t *Torrent) pieceHasher(initial pieceIndex) {
        t.finishHash(initial)
        for {
@@ -2693,6 +2690,7 @@ func (t *Torrent) pieceHasher(initial pieceIndex) {
                t.cl.unlock()
                t.finishHash(pi)
        }
+       t.cl.startPieceHashers()
        t.cl.unlock()
 }
 
@@ -2702,9 +2700,10 @@ func (t *Torrent) startHash(pi pieceIndex) {
        t.deferUpdateComplete()
        p.hashing = true
        t.deferPublishPieceStateChange(pi)
-       t.updatePiecePriority(pi, "Torrent.startPieceHasher")
+       t.updatePiecePriority(pi, "Torrent.startHash")
        t.storageLock.RLock()
        t.activePieceHashes++
+       t.cl.activePieceHashers++
 }
 
 func (t *Torrent) getPieceToHash() (_ g.Option[pieceIndex]) {
@@ -2769,6 +2768,7 @@ func (t *Torrent) finishHash(index pieceIndex) {
        t.pieceHashed(index, correct, copyErr)
        t.updatePiecePriority(index, "Torrent.finishHash")
        t.activePieceHashes--
+       t.cl.activePieceHashers--
 }
 
 // Return the connections that touched a piece, and clear the entries while doing it.
@@ -3541,3 +3541,19 @@ func (t *Torrent) getClosedErr() error {
        }
        return nil
 }
+
+func (t *Torrent) considerStartingHashers() bool {
+       if t.storage == nil {
+               return false
+       }
+       if t.activePieceHashes >= t.cl.config.PieceHashersPerTorrent {
+               return false
+       }
+       if !t.cl.canStartPieceHashers() {
+               return false
+       }
+       if t.piecesQueuedForHash.IsEmpty() {
+               return false
+       }
+       return true
+}