From: Matt Joiner Date: Mon, 21 Jul 2025 12:01:38 +0000 (+1000) Subject: Limit piece hashers per client X-Git-Tag: v1.59.0~2^2~96 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=6cdcac807907f5aeaa3f84b144f73bad4c7163c7;p=btrtrc.git Limit piece hashers per client --- diff --git a/client.go b/client.go index de598591..b4857a32 100644 --- a/client.go +++ b/client.go @@ -16,6 +16,7 @@ import ( "net" "net/http" "net/netip" + "runtime" "slices" "strconv" "time" @@ -108,6 +109,8 @@ type Client struct { upnpMappings []*upnpMapping webseedRequestTimer *time.Timer + + activePieceHashers int } type ipStr string @@ -1614,6 +1617,7 @@ func (cl *Client) Torrents() []*Torrent { } func (cl *Client) torrentsAsSlice() (ret []*Torrent) { + ret = make([]*Torrent, 0, len(cl.torrents)) for t := range cl.torrents { ret = append(ret, t) } @@ -1941,3 +1945,38 @@ func (cl *Client) checkConfig() error { } return nil } + +func (cl *Client) maxActivePieceHashers() int { + return runtime.NumCPU() +} + +func (cl *Client) belowMaxActivePieceHashers() bool { + return cl.activePieceHashers < cl.maxActivePieceHashers() +} + +func (cl *Client) canStartPieceHashers() bool { + return cl.belowMaxActivePieceHashers() +} + +func (cl *Client) startPieceHashers() { + if !cl.canStartPieceHashers() { + return + } + ts := make([]*Torrent, 0, len(cl.torrents)) + for t := range cl.torrents { + if !t.considerStartingHashers() { + continue + } + ts = append(ts, t) + } + // Sort largest torrents first, as those are preferred by webseeds, and will cause less thrashing. + slices.SortFunc(ts, func(a, b *Torrent) int { + return -cmp.Compare(a.length(), b.length()) + }) + for _, t := range ts { + t.startPieceHashers() + if !cl.canStartPieceHashers() { + break + } + } +} diff --git a/torrent.go b/torrent.go index bbbbf634..3c0ac745 100644 --- a/torrent.go +++ b/torrent.go @@ -2486,9 +2486,6 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { } func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { - t.logger.LazyLog(log.Debug, func() log.Msg { - return log.Fstr("hashed piece %d (passed=%t)", piece, passed) - }) p := t.piece(piece) p.numVerifies++ p.numVerifiesCond.Broadcast() @@ -2656,31 +2653,31 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { }) } +// Torrent piece hashers are sticky and will try to keep hashing pieces in the same Torrent to keep +// the storage hot. func (t *Torrent) startPieceHashers() error { if t.closed.IsSet() { return errTorrentClosed } - for t.startPieceHasher() { + for t.considerStartingHashers() { + if !t.startSinglePieceHasher() { + break + } } return nil } -func (t *Torrent) startPieceHasher() bool { - if t.storage == nil { - return false - } - if t.activePieceHashes >= t.cl.config.PieceHashersPerTorrent { - return false - } +func (t *Torrent) startSinglePieceHasher() bool { pi := t.getPieceToHash() - if pi.Ok { - t.startHash(pi.Value) - go t.pieceHasher(pi.Value) - return true + if !pi.Ok { + return false } - return false + t.startHash(pi.Value) + go t.pieceHasher(pi.Value) + return true } +// Sticky to a Torrent. Might as well since that keeps the storage hot. func (t *Torrent) pieceHasher(initial pieceIndex) { t.finishHash(initial) for { @@ -2693,6 +2690,7 @@ func (t *Torrent) pieceHasher(initial pieceIndex) { t.cl.unlock() t.finishHash(pi) } + t.cl.startPieceHashers() t.cl.unlock() } @@ -2702,9 +2700,10 @@ func (t *Torrent) startHash(pi pieceIndex) { t.deferUpdateComplete() p.hashing = true t.deferPublishPieceStateChange(pi) - t.updatePiecePriority(pi, "Torrent.startPieceHasher") + t.updatePiecePriority(pi, "Torrent.startHash") t.storageLock.RLock() t.activePieceHashes++ + t.cl.activePieceHashers++ } func (t *Torrent) getPieceToHash() (_ g.Option[pieceIndex]) { @@ -2769,6 +2768,7 @@ func (t *Torrent) finishHash(index pieceIndex) { t.pieceHashed(index, correct, copyErr) t.updatePiecePriority(index, "Torrent.finishHash") t.activePieceHashes-- + t.cl.activePieceHashers-- } // Return the connections that touched a piece, and clear the entries while doing it. @@ -3541,3 +3541,19 @@ func (t *Torrent) getClosedErr() error { } return nil } + +func (t *Torrent) considerStartingHashers() bool { + if t.storage == nil { + return false + } + if t.activePieceHashes >= t.cl.config.PieceHashersPerTorrent { + return false + } + if !t.cl.canStartPieceHashers() { + return false + } + if t.piecesQueuedForHash.IsEmpty() { + return false + } + return true +}