]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Skip smartban hashing if there's no peers to ban for a piece
authorMatt Joiner <anacrolix@gmail.com>
Sat, 19 Jul 2025 11:44:39 +0000 (21:44 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 19 Jul 2025 11:44:46 +0000 (21:44 +1000)
Also take smartban.Cache lock only once per piece when forgetting blocks

iter.go
smartban.go
smartban/smartban.go
torrent.go

diff --git a/iter.go b/iter.go
index 350011b1b59f6fd697815c507376e453be4457dc..fc3e8b752125bc9836fed6927d23b724178f3b7e 100644 (file)
--- a/iter.go
+++ b/iter.go
@@ -3,13 +3,16 @@ package torrent
 import (
        "iter"
 
-       g "github.com/anacrolix/generics"
+       "golang.org/x/exp/constraints"
 )
 
-// Returns Some of the last item in a iter.Seq, or None if the sequence is empty.
-func seqLast[V any](seq iter.Seq[V]) (last g.Option[V]) {
-       for item := range seq {
-               last.Set(item)
+// Returns an iterator that yields integers from start (inclusive) to end (exclusive).
+func iterRange[T constraints.Integer](start, end T) iter.Seq[T] {
+       return func(yield func(T) bool) {
+               for i := start; i < end; i++ {
+                       if !yield(i) {
+                               return
+                       }
+               }
        }
-       return
 }
index 5515ded51fa3ef94f145a57439ef9e877dee6719..857ca2914d3cbd97ddcca9995bc7b491a44461d7 100644 (file)
@@ -25,7 +25,8 @@ type blockCheckingWriter struct {
 func (me *blockCheckingWriter) checkBlock() {
        b := me.blockBuffer.Next(me.chunkSize)
        for _, peer := range me.cache.CheckBlock(me.requestIndex, b) {
-               g.MakeMapIfNilAndSet(&me.badPeers, peer, struct{}{})
+               g.MakeMapIfNil(&me.badPeers)
+               me.badPeers[peer] = struct{}{}
        }
        me.requestIndex++
 }
index 1910d7d5a2ff4027160cebafb586eeded4e94542..78425946de0c6351d26b990df8e193987369e7dc 100644 (file)
@@ -1,6 +1,7 @@
 package smartban
 
 import (
+       "iter"
        "sync"
 
        g "github.com/anacrolix/generics"
@@ -9,6 +10,7 @@ import (
 type Cache[Peer, BlockKey, Hash comparable] struct {
        Hash func([]byte) Hash
 
+       // Wonder if we should make this an atomic.
        lock   sync.RWMutex
        blocks map[BlockKey][]peerAndHash[Peer, Hash]
 }
@@ -48,10 +50,30 @@ func (me *Cache[Peer, BlockKey, Hash]) CheckBlock(key BlockKey, data []byte) (ba
        return
 }
 
-func (me *Cache[Peer, BlockKey, Hash]) ForgetBlock(key BlockKey) {
+func (me *Cache[Peer, BlockKey, Hash]) ForgetBlockSeq(seq iter.Seq[BlockKey]) {
        me.lock.Lock()
        defer me.lock.Unlock()
-       delete(me.blocks, key)
+       if len(me.blocks) == 0 {
+               return
+       }
+       for key := range seq {
+               delete(me.blocks, key)
+       }
+}
+
+// Returns whether any block in the sequence has at least once peer recorded.
+func (me *Cache[Peer, BlockKey, Hash]) HasPeerForBlocks(seq iter.Seq[BlockKey]) bool {
+       me.lock.RLock()
+       defer me.lock.RUnlock()
+       if len(me.blocks) == 0 {
+               return false
+       }
+       for key := range seq {
+               if len(me.blocks[key]) != 0 {
+                       return true
+               }
+       }
+       return false
 }
 
 func (me *Cache[Peer, BlockKey, Hash]) HasBlocks() bool {
index 3a909de2eca1aca85d8645aed14c7e148f42cc7d..1a0db793d40a8eec65160f8d928d307b1153d120 100644 (file)
@@ -1183,14 +1183,18 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
        return pp.Integer(t.info.PieceLength)
 }
 
-func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
-       return &blockCheckingWriter{
+func (t *Torrent) getBlockCheckingWriterForPiece(piece pieceIndex) blockCheckingWriter {
+       return blockCheckingWriter{
                cache:        &t.smartBanCache,
                requestIndex: t.pieceRequestIndexBegin(piece),
                chunkSize:    t.chunkSize.Int(),
        }
 }
 
+func (t *Torrent) hasSmartbanDataForPiece(piece pieceIndex) bool {
+       return t.smartBanCache.HasPeerForBlocks(iterRange(t.pieceRequestIndexBegin(piece), t.pieceRequestIndexBegin(piece+1)))
+}
+
 func (t *Torrent) countBytesHashed(n int64) {
        t.counters.BytesHashed.Add(n)
        t.cl.counters.BytesHashed.Add(n)
@@ -1276,25 +1280,30 @@ func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) (
        differingPeers map[bannableAddr]struct{},
        err error,
 ) {
+       var w io.Writer = h
+       if t.hasSmartbanDataForPiece(piece) {
+               smartBanWriter := t.getBlockCheckingWriterForPiece(piece)
+               w = io.MultiWriter(h, &smartBanWriter)
+               defer func() {
+                       if err != nil {
+                               // Skip smart banning since we can't blame them for storage issues. A short write would
+                               // ban peers for all recorded blocks that weren't just written.
+                               return
+                       }
+                       // Flush now, even though we may not have finished writing to the piece hash, since
+                       // further data is padding only and should not have come from peers.
+                       smartBanWriter.Flush()
+                       differingPeers = smartBanWriter.badPeers
+               }()
+       }
        p := t.piece(piece)
        storagePiece := p.Storage()
-
-       smartBanWriter := t.smartBanBlockCheckingWriter(piece)
-       multiWriter := io.MultiWriter(h, smartBanWriter)
-       {
-               var written int64
-               written, err = storagePiece.WriteTo(multiWriter)
-               if err == nil && written != int64(p.length()) {
-                       err = fmt.Errorf("wrote %v bytes from storage, piece has length %v", written, p.length())
-                       // Skip smart banning since we can't blame them for storage issues. A short write would
-                       // ban peers for all recorded blocks that weren't just written.
-                       return
-               }
-               t.countBytesHashed(written)
+       var written int64
+       written, err = storagePiece.WriteTo(w)
+       if err == nil && written != int64(p.length()) {
+               err = fmt.Errorf("wrote %v bytes from storage, piece has length %v", written, p.length())
        }
-       // Flush before writing padding, since we would not have recorded the padding blocks.
-       smartBanWriter.Flush()
-       differingPeers = smartBanWriter.badPeers
+       t.countBytesHashed(written)
        return
 }
 
@@ -2754,9 +2763,7 @@ func (t *Torrent) finishHash(index pieceIndex) {
                        t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
                }
                t.dropBannedPeers()
-               for ri := t.pieceRequestIndexBegin(index); ri < t.pieceRequestIndexBegin(index+1); ri++ {
-                       t.smartBanCache.ForgetBlock(ri)
-               }
+               t.smartBanCache.ForgetBlockSeq(iterRange(t.pieceRequestIndexBegin(index), t.pieceRequestIndexBegin(index+1)))
        }
        p.hashing = false
        t.pieceHashed(index, correct, copyErr)