Fixes #905.
"bufio"
"context"
"crypto/rand"
- "crypto/sha1"
"encoding/binary"
"encoding/hex"
"errors"
"expvar"
"fmt"
+ "github.com/cespare/xxhash"
"io"
"math"
"net"
webSeeds: make(map[string]*Peer),
gotMetainfoC: make(chan struct{}),
}
- t.smartBanCache.Hash = sha1.Sum
+ var salt [8]byte
+ rand.Read(salt[:])
+ t.smartBanCache.Hash = func(b []byte) uint64 {
+ h := xxhash.New()
+ h.Write(salt[:])
+ h.Write(b)
+ return h.Sum64()
+ }
t.smartBanCache.Init()
t.networkingEnabled.Set()
t.logger = cl.logger.WithDefaultLevel(log.Debug)
github.com/anacrolix/utp v0.1.0
github.com/bahlo/generic-list-go v0.2.0
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8
+ github.com/cespare/xxhash v1.1.0
github.com/davecgh/go-spew v1.1.1
github.com/dustin/go-humanize v1.0.0
github.com/edsrzf/mmap-go v1.1.0
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
"io"
"net"
"strings"
+ "sync"
"time"
"github.com/RoaringBitmap/roaring"
}
req := c.t.requestIndexFromRequest(ppReq)
- if c.bannableAddr.Ok {
- t.smartBanCache.RecordBlock(c.bannableAddr.Value, req, msg.Piece)
- }
+ recordBlockForSmartBan := sync.OnceFunc(func() {
+ c.recordBlockForSmartBan(req, msg.Piece)
+ })
+ // This needs to occur before we return, but we try to do it when the client is unlocked. It
+ // can't be done before checking if chunks are valid because they won't be deallocated by piece
+ // hashing if they're out of bounds.
+ defer recordBlockForSmartBan()
if c.peerChoking {
chunksReceived.Add("while choked", 1)
err = func() error {
cl.unlock()
defer cl.lock()
+ // Opportunistically do this here while we aren't holding the client lock.
+ recordBlockForSmartBan()
concurrentChunkWrites.Add(1)
defer concurrentChunkWrites.Add(-1)
// Write the chunk out. Note that the upper bound on chunk writing concurrency will be the
// }
p.peakRequests--
}
+
+func (p *Peer) recordBlockForSmartBan(req RequestIndex, blockData []byte) {
+ if p.bannableAddr.Ok {
+ p.t.smartBanCache.RecordBlock(p.bannableAddr.Value, req, blockData)
+ }
+}
"encoding/binary"
"errors"
"fmt"
+ g "github.com/anacrolix/generics"
"io"
"net"
+ "net/netip"
"sync"
"testing"
}
type torrentStorage struct {
- writeSem sync.Mutex
+ allChunksWritten sync.WaitGroup
}
func (me *torrentStorage) Close() error { return nil }
if len(b) != defaultChunkSize {
panic(len(b))
}
- me.writeSem.Unlock()
+ me.allChunksWritten.Done()
return len(b), nil
}
t.onSetInfo()
t._pendingPieces.Add(0)
r, w := net.Pipe()
+ c.Logf("pipe reader remote addr: %v", r.RemoteAddr())
cn := cl.newConnection(r, newConnectionOpts{
- outgoing: true,
- remoteAddr: r.RemoteAddr(),
+ outgoing: true,
+ // TODO: This is a hack to give the pipe a bannable remote address.
+ remoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 2, 3, 4}), 1234),
network: r.RemoteAddr().Network(),
connString: regularNetConnPeerConnConnString(r),
})
+ c.Assert(cn.bannableAddr.Ok, qt.IsTrue)
cn.setTorrent(t)
- mrlErrChan := make(chan error)
- msg := pp.Message{
- Type: pp.Piece,
- Piece: make([]byte, defaultChunkSize),
+ requestIndexBegin := t.pieceRequestIndexOffset(0)
+ requestIndexEnd := t.pieceRequestIndexOffset(1)
+ eachRequestIndex := func(f func(ri RequestIndex)) {
+ for ri := requestIndexBegin; ri < requestIndexEnd; ri++ {
+ f(ri)
+ }
}
+ const chunkSize = defaultChunkSize
+ numRequests := requestIndexEnd - requestIndexBegin
+ msgBufs := make([][]byte, 0, numRequests)
+ eachRequestIndex(func(ri RequestIndex) {
+ msgBufs = append(msgBufs, pp.Message{
+ Type: pp.Piece,
+ Piece: make([]byte, chunkSize),
+ Begin: pp.Integer(chunkSize) * pp.Integer(ri),
+ }.MustMarshalBinary())
+ })
+ // errgroup can't handle this pattern...
+ allErrors := make(chan error, 2)
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
+ defer wg.Done()
cl.lock()
err := cn.mainReadLoop()
- if err != nil {
- mrlErrChan <- err
+ if errors.Is(err, io.EOF) {
+ err = nil
}
- close(mrlErrChan)
+ allErrors <- err
}()
- wb := msg.MustMarshalBinary()
- b.SetBytes(int64(len(msg.Piece)))
+ b.SetBytes(chunkSize * int64(numRequests))
+ wg.Add(1)
go func() {
- ts.writeSem.Lock()
+ defer wg.Done()
for i := 0; i < b.N; i += 1 {
cl.lock()
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
t.pendAllChunkSpecs(0)
- cn.validReceiveChunks = map[RequestIndex]int{
- t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
- }
+ g.MakeMapIfNil(&cn.validReceiveChunks)
+ eachRequestIndex(func(ri RequestIndex) {
+ cn.validReceiveChunks[ri] = 1
+ })
cl.unlock()
- n, err := w.Write(wb)
- require.NoError(b, err)
- require.EqualValues(b, len(wb), n)
- ts.writeSem.Lock()
+ ts.allChunksWritten.Add(int(numRequests))
+ for _, wb := range msgBufs {
+ n, err := w.Write(wb)
+ require.NoError(b, err)
+ require.EqualValues(b, len(wb), n)
+ }
+ // This is unlocked by a successful write to storage. So this unblocks when that is
+ // done.
+ ts.allChunksWritten.Wait()
}
if err := w.Close(); err != nil {
panic(err)
}
}()
- mrlErr := <-mrlErrChan
- if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
- c.Fatal(mrlErr)
+ go func() {
+ wg.Wait()
+ close(allErrors)
+ }()
+ var err error
+ for err = range allErrors {
+ if err != nil {
+ break
+ }
}
- c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
+ c.Assert(err, qt.IsNil)
+ c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N)*int64(numRequests))
+ c.Assert(t.smartBanCache.HasBlocks(), qt.IsTrue)
}
func TestConnPexPeerFlags(t *testing.T) {
import (
"bytes"
- "crypto/sha1"
"net/netip"
g "github.com/anacrolix/generics"
type bannableAddr = netip.Addr
-type smartBanCache = smartban.Cache[bannableAddr, RequestIndex, [sha1.Size]byte]
+type smartBanCache = smartban.Cache[bannableAddr, RequestIndex, uint64]
type blockCheckingWriter struct {
cache *smartBanCache
package smartban
import (
+ g "github.com/anacrolix/generics"
"sync"
)
Hash func([]byte) Hash
lock sync.RWMutex
- blocks map[BlockKey]map[Peer]Hash
+ blocks map[BlockKey][]peerAndHash[Peer, Hash]
}
type Block[Key any] struct {
Data []byte
}
+type peerAndHash[Peer, Hash any] struct {
+ Peer Peer
+ Hash Hash
+}
+
func (me *Cache[Peer, BlockKey, Hash]) Init() {
- me.blocks = make(map[BlockKey]map[Peer]Hash)
+ g.MakeMap(&me.blocks)
}
func (me *Cache[Peer, BlockKey, Hash]) RecordBlock(peer Peer, key BlockKey, data []byte) {
me.lock.Lock()
defer me.lock.Unlock()
peers := me.blocks[key]
- if peers == nil {
- peers = make(map[Peer]Hash)
- me.blocks[key] = peers
- }
- peers[peer] = hash
+ peers = append(peers, peerAndHash[Peer, Hash]{peer, hash})
+ me.blocks[key] = peers
}
func (me *Cache[Peer, BlockKey, Hash]) CheckBlock(key BlockKey, data []byte) (bad []Peer) {
correct := me.Hash(data)
me.lock.RLock()
defer me.lock.RUnlock()
- for peer, hash := range me.blocks[key] {
- if hash != correct {
- bad = append(bad, peer)
+ for _, item := range me.blocks[key] {
+ if item.Hash != correct {
+ bad = append(bad, item.Peer)
}
}
return
defer me.lock.Unlock()
delete(me.blocks, key)
}
+
+func (me *Cache[Peer, BlockKey, Hash]) HasBlocks() bool {
+ me.lock.RLock()
+ defer me.lock.RUnlock()
+ return len(me.blocks) != 0
+}
--- /dev/null
+package torrent
+
+import (
+ "crypto/sha1"
+ "github.com/anacrolix/missinggo/v2/iter"
+ "github.com/anacrolix/torrent/smartban"
+ "github.com/cespare/xxhash"
+ "net/netip"
+ "testing"
+)
+
+func benchmarkSmartBanRecordBlock[Sum comparable](b *testing.B, hash func([]byte) Sum) {
+ var cache smartban.Cache[bannableAddr, RequestIndex, Sum]
+ cache.Hash = hash
+ cache.Init()
+ var data [defaultChunkSize]byte
+ var addr netip.Addr
+ b.SetBytes(int64(len(data)))
+ for i := range iter.N(b.N) {
+ cache.RecordBlock(addr, RequestIndex(i), data[:])
+ }
+}
+
+func BenchmarkSmartBanRecordBlock(b *testing.B) {
+ b.Run("xxHash", func(b *testing.B) {
+ var salt [8]byte
+ benchmarkSmartBanRecordBlock(b, func(block []byte) uint64 {
+ h := xxhash.New()
+ // xxHash is not cryptographic, and so we're salting it so attackers can't know a priori
+ // where block data collisions are.
+ h.Write(salt[:])
+ h.Write(block)
+ return h.Sum64()
+ })
+ })
+ b.Run("Sha1", func(b *testing.B) {
+ benchmarkSmartBanRecordBlock(b, sha1.Sum)
+ })
+}
. "github.com/anacrolix/generics"
g "github.com/anacrolix/generics"
"github.com/anacrolix/log"
- "github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
}
func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
- defer perf.ScopeTimerErr(&err)()
+ //defer perf.ScopeTimerErr(&err)()
n, err := t.pieces[piece].Storage().WriteAt(data, begin)
if err == nil && n != len(data) {
err = io.ErrShortWrite