"bufio"
"context"
"crypto/rand"
+ "crypto/sha1"
"encoding/binary"
"errors"
"expvar"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
+ "github.com/anacrolix/torrent/option"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
// through legitimate channels.
dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{}
+ bannedPrefixes map[string]struct{}
torrents map[InfoHash]*Torrent
pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
MaxConnsPerHost: 10,
},
}
+ cl.bannedPrefixes = make(map[banPrefix]struct{})
}
func NewClient(cfg *ClientConfig) (cl *Client, err error) {
if ipa, ok := tryIpPortFromNetAddr(addr); ok {
return cl.badPeerIPPort(ipa.IP, ipa.Port)
}
+ addrStr := addr.String()
+ for prefix := range cl.bannedPrefixes {
+ if strings.HasPrefix(addrStr, prefix) {
+ return true
+ }
+ }
return false
}
webSeeds: make(map[string]*Peer),
gotMetainfoC: make(chan struct{}),
}
+ t.smartBanCache.Hash = sha1.Sum
+ t.smartBanCache.Init()
t.networkingEnabled.Set()
t.logger = cl.logger.WithContextValue(t)
if opts.ChunkSize == 0 {
connString: connString,
conn: nc,
}
+ if remoteAddr != nil {
+ c.banPrefix = option.Some(remoteAddr.String())
+ }
c.peerImpl = c
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
c.setRW(connStatsReadWriter{nc, c})
func (cl *Client) ConnStats() ConnStats {
return cl.stats.Copy()
}
+
+func (cl *Client) banPrefix(prefix banPrefix) {
+ cl.bannedPrefixes[prefix] = struct{}{}
+}
--- /dev/null
+package option
+
+type T[V any] struct {
+ ok bool
+ value V
+}
+
+func (me *T[V]) Ok() bool {
+ return me.ok
+}
+
+func (me *T[V]) Value() V {
+ if !me.ok {
+ panic("not set")
+ }
+ return me.value
+}
+
+func Some[V any](value V) T[V] {
+ return T[V]{ok: true, value: value}
+}
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
+ "github.com/anacrolix/torrent/option"
"github.com/anacrolix/chansync"
"github.com/anacrolix/torrent/bencode"
outgoing bool
Network string
RemoteAddr PeerRemoteAddr
+ banPrefix option.T[string]
// True if the connection is operating over MSE obfuscation.
headerEncrypted bool
cryptoMethod mse.CryptoMethod
ppReq := newRequestFromMessage(msg)
req := c.t.requestIndexFromRequest(ppReq)
+ t := c.t
+
+ if c.banPrefix.Ok() {
+ t.smartBanCache.RecordBlock(c.banPrefix.Value(), req, msg.Piece)
+ }
if c.peerChoking {
chunksReceived.Add("while choked", 1)
}
}
- t := c.t
cl := t.cl
// Do we actually want this chunk?
--- /dev/null
+package torrent
+
+import (
+ "bytes"
+ "crypto/sha1"
+
+ "github.com/anacrolix/torrent/smartban"
+)
+
+type banPrefix = string
+
+type smartBanCache = smartban.Cache[banPrefix, RequestIndex, [sha1.Size]byte]
+
+type blockCheckingWriter struct {
+ cache *smartBanCache
+ requestIndex RequestIndex
+ // Peers that didn't match blocks written now.
+ badPeers map[banPrefix]struct{}
+ blockBuffer bytes.Buffer
+ chunkSize int
+}
+
+func (me *blockCheckingWriter) checkBlock() {
+ b := me.blockBuffer.Next(me.chunkSize)
+ for _, peer := range me.cache.CheckBlock(me.requestIndex, b) {
+ me.badPeers[peer] = struct{}{}
+ }
+ me.requestIndex++
+}
+
+func (me *blockCheckingWriter) checkFullBlocks() {
+ for me.blockBuffer.Len() >= me.chunkSize {
+ me.checkBlock()
+ }
+}
+
+func (me *blockCheckingWriter) Write(b []byte) (int, error) {
+ n, err := me.blockBuffer.Write(b)
+ if err != nil {
+ // bytes.Buffer.Write should never fail.
+ panic(err)
+ }
+ me.checkFullBlocks()
+ return n, err
+}
+
+// Check any remaining block data. Terminal pieces or piece sizes that don't divide into the chunk
+// size cleanly may leave fragments that should be checked.
+func (me *blockCheckingWriter) Flush() {
+ for me.blockBuffer.Len() != 0 {
+ me.checkBlock()
+ }
+}
--- /dev/null
+package smartban
+
+import (
+ "sync"
+)
+
+type Cache[Peer, BlockKey, Hash comparable] struct {
+ Hash func([]byte) Hash
+
+ lock sync.RWMutex
+ blocks map[BlockKey]map[Peer]Hash
+}
+
+type Block[Key any] struct {
+ Key Key
+ Data []byte
+}
+
+func (me *Cache[Peer, BlockKey, Hash]) Init() {
+ me.blocks = make(map[BlockKey]map[Peer]Hash)
+}
+
+func (me *Cache[Peer, BlockKey, Hash]) RecordBlock(peer Peer, key BlockKey, data []byte) {
+ hash := me.Hash(data)
+ me.lock.Lock()
+ defer me.lock.Unlock()
+ peers := me.blocks[key]
+ if peers == nil {
+ peers = make(map[Peer]Hash)
+ me.blocks[key] = peers
+ }
+ peers[peer] = hash
+}
+
+func (me *Cache[Peer, BlockKey, Hash]) CheckBlock(key BlockKey, data []byte) (bad []Peer) {
+ correct := me.Hash(data)
+ me.lock.RLock()
+ defer me.lock.RUnlock()
+ for peer, hash := range me.blocks[key] {
+ if hash != correct {
+ bad = append(bad, peer)
+ }
+ }
+ return
+}
+
+func (me *Cache[Peer, BlockKey, Hash]) ForgetBlock(key BlockKey) {
+ me.lock.Lock()
+ defer me.lock.Unlock()
+ delete(me.blocks, key)
+}
// Is On when all pieces are complete.
Complete chansync.Flag
+
+ smartBanCache smartBanCache
}
func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
return pp.Integer(t.info.PieceLength)
}
-func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
+func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
+ return &blockCheckingWriter{
+ cache: &t.smartBanCache,
+ requestIndex: t.pieceRequestIndexOffset(piece),
+ chunkSize: t.chunkSize.Int(),
+ }
+}
+
+func (t *Torrent) hashPiece(piece pieceIndex) (
+ ret metainfo.Hash,
+ // These are peers that sent us blocks that differ from what we hash here.
+ differingPeers map[banPrefix]struct{},
+ err error,
+) {
p := t.piece(piece)
p.waitNoPendingWrites()
storagePiece := t.pieces[piece].Storage()
hash := pieceHash.New()
const logPieceContents = false
+ smartBanWriter := t.smartBanBlockCheckingWriter(piece)
+ writers := []io.Writer{hash, smartBanWriter}
+ var examineBuf bytes.Buffer
+ if logPieceContents {
+ writers = append(writers, &examineBuf)
+ }
+ _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
if logPieceContents {
- var examineBuf bytes.Buffer
- _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
- } else {
- _, err = storagePiece.WriteTo(hash)
}
+ smartBanWriter.Flush()
+ differingPeers = smartBanWriter.badPeers
missinggo.CopyExact(&ret, hash.Sum(nil))
return
}
func (t *Torrent) pieceHasher(index pieceIndex) {
p := t.piece(index)
- sum, copyErr := t.hashPiece(index)
+ sum, failedPeers, copyErr := t.hashPiece(index)
correct := sum == *p.hash
+ if correct {
+ for peer := range failedPeers {
+ log.Printf("would smart ban %q for %v here", peer, p)
+ t.cl.banPrefix(peer)
+ }
+ }
switch copyErr {
case nil, io.EOF:
default:
// requests mark more often, so recomputation is probably sooner than with regular peer
// conns. ~4x maxRequests would be about right.
PeerMaxRequests: 128,
- RemoteAddr: remoteAddrFromUrl(url),
- callbacks: t.callbacks(),
+ // TODO: Set ban prefix?
+ RemoteAddr: remoteAddrFromUrl(url),
+ callbacks: t.callbacks(),
},
client: webseed.Client{
HttpClient: t.cl.webseedHttpClient,