From: Matt Joiner Date: Sat, 19 Jul 2025 11:44:39 +0000 (+1000) Subject: Skip smartban hashing if there's no peers to ban for a piece X-Git-Tag: v1.59.0~7 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=c110aad882c40886efcc4f657e68bb8628e84b64;p=btrtrc.git Skip smartban hashing if there's no peers to ban for a piece Also take smartban.Cache lock only once per piece when forgetting blocks --- diff --git a/iter.go b/iter.go index 350011b1..fc3e8b75 100644 --- a/iter.go +++ b/iter.go @@ -3,13 +3,16 @@ package torrent import ( "iter" - g "github.com/anacrolix/generics" + "golang.org/x/exp/constraints" ) -// Returns Some of the last item in a iter.Seq, or None if the sequence is empty. -func seqLast[V any](seq iter.Seq[V]) (last g.Option[V]) { - for item := range seq { - last.Set(item) +// Returns an iterator that yields integers from start (inclusive) to end (exclusive). +func iterRange[T constraints.Integer](start, end T) iter.Seq[T] { + return func(yield func(T) bool) { + for i := start; i < end; i++ { + if !yield(i) { + return + } + } } - return } diff --git a/smartban.go b/smartban.go index 5515ded5..857ca291 100644 --- a/smartban.go +++ b/smartban.go @@ -25,7 +25,8 @@ type blockCheckingWriter struct { func (me *blockCheckingWriter) checkBlock() { b := me.blockBuffer.Next(me.chunkSize) for _, peer := range me.cache.CheckBlock(me.requestIndex, b) { - g.MakeMapIfNilAndSet(&me.badPeers, peer, struct{}{}) + g.MakeMapIfNil(&me.badPeers) + me.badPeers[peer] = struct{}{} } me.requestIndex++ } diff --git a/smartban/smartban.go b/smartban/smartban.go index 1910d7d5..78425946 100644 --- a/smartban/smartban.go +++ b/smartban/smartban.go @@ -1,6 +1,7 @@ package smartban import ( + "iter" "sync" g "github.com/anacrolix/generics" @@ -9,6 +10,7 @@ import ( type Cache[Peer, BlockKey, Hash comparable] struct { Hash func([]byte) Hash + // Wonder if we should make this an atomic. lock sync.RWMutex blocks map[BlockKey][]peerAndHash[Peer, Hash] } @@ -48,10 +50,30 @@ func (me *Cache[Peer, BlockKey, Hash]) CheckBlock(key BlockKey, data []byte) (ba return } -func (me *Cache[Peer, BlockKey, Hash]) ForgetBlock(key BlockKey) { +func (me *Cache[Peer, BlockKey, Hash]) ForgetBlockSeq(seq iter.Seq[BlockKey]) { me.lock.Lock() defer me.lock.Unlock() - delete(me.blocks, key) + if len(me.blocks) == 0 { + return + } + for key := range seq { + delete(me.blocks, key) + } +} + +// Returns whether any block in the sequence has at least once peer recorded. +func (me *Cache[Peer, BlockKey, Hash]) HasPeerForBlocks(seq iter.Seq[BlockKey]) bool { + me.lock.RLock() + defer me.lock.RUnlock() + if len(me.blocks) == 0 { + return false + } + for key := range seq { + if len(me.blocks[key]) != 0 { + return true + } + } + return false } func (me *Cache[Peer, BlockKey, Hash]) HasBlocks() bool { diff --git a/torrent.go b/torrent.go index 3a909de2..1a0db793 100644 --- a/torrent.go +++ b/torrent.go @@ -1183,14 +1183,18 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { return pp.Integer(t.info.PieceLength) } -func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter { - return &blockCheckingWriter{ +func (t *Torrent) getBlockCheckingWriterForPiece(piece pieceIndex) blockCheckingWriter { + return blockCheckingWriter{ cache: &t.smartBanCache, requestIndex: t.pieceRequestIndexBegin(piece), chunkSize: t.chunkSize.Int(), } } +func (t *Torrent) hasSmartbanDataForPiece(piece pieceIndex) bool { + return t.smartBanCache.HasPeerForBlocks(iterRange(t.pieceRequestIndexBegin(piece), t.pieceRequestIndexBegin(piece+1))) +} + func (t *Torrent) countBytesHashed(n int64) { t.counters.BytesHashed.Add(n) t.cl.counters.BytesHashed.Add(n) @@ -1276,25 +1280,30 @@ func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) ( differingPeers map[bannableAddr]struct{}, err error, ) { + var w io.Writer = h + if t.hasSmartbanDataForPiece(piece) { + smartBanWriter := t.getBlockCheckingWriterForPiece(piece) + w = io.MultiWriter(h, &smartBanWriter) + defer func() { + if err != nil { + // Skip smart banning since we can't blame them for storage issues. A short write would + // ban peers for all recorded blocks that weren't just written. + return + } + // Flush now, even though we may not have finished writing to the piece hash, since + // further data is padding only and should not have come from peers. + smartBanWriter.Flush() + differingPeers = smartBanWriter.badPeers + }() + } p := t.piece(piece) storagePiece := p.Storage() - - smartBanWriter := t.smartBanBlockCheckingWriter(piece) - multiWriter := io.MultiWriter(h, smartBanWriter) - { - var written int64 - written, err = storagePiece.WriteTo(multiWriter) - if err == nil && written != int64(p.length()) { - err = fmt.Errorf("wrote %v bytes from storage, piece has length %v", written, p.length()) - // Skip smart banning since we can't blame them for storage issues. A short write would - // ban peers for all recorded blocks that weren't just written. - return - } - t.countBytesHashed(written) + var written int64 + written, err = storagePiece.WriteTo(w) + if err == nil && written != int64(p.length()) { + err = fmt.Errorf("wrote %v bytes from storage, piece has length %v", written, p.length()) } - // Flush before writing padding, since we would not have recorded the padding blocks. - smartBanWriter.Flush() - differingPeers = smartBanWriter.badPeers + t.countBytesHashed(written) return } @@ -2754,9 +2763,7 @@ func (t *Torrent) finishHash(index pieceIndex) { t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index) } t.dropBannedPeers() - for ri := t.pieceRequestIndexBegin(index); ri < t.pieceRequestIndexBegin(index+1); ri++ { - t.smartBanCache.ForgetBlock(ri) - } + t.smartBanCache.ForgetBlockSeq(iterRange(t.pieceRequestIndexBegin(index), t.pieceRequestIndexBegin(index+1))) } p.hashing = false t.pieceHashed(index, correct, copyErr)