]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Limit peer request data allocation
authorMatt Joiner <anacrolix@gmail.com>
Tue, 14 Feb 2023 01:46:23 +0000 (12:46 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 14 Feb 2023 01:46:49 +0000 (12:46 +1100)
This follows up from abb5cbc96e301a4ca1f5df698b105ae8553ce1e9. We currently limit how many requests peers can send us, but didn't really check that peers didn't make us allocate huge amounts of space to buffer their requests. I'm sure there's some rough edges here.

client.go
config.go
internal/alloclim/alloclim_test.go [new file with mode: 0644]
internal/alloclim/l.go [new file with mode: 0644]
internal/alloclim/r.go [new file with mode: 0644]
peerconn.go
test/leecher-storage.go
test/transfer_test.go

index 4adf28b723dae966b83c34fbaf295d38e58e204d..167cf0cbfce963535b32a98b6d0d5160f345a560 100644 (file)
--- a/client.go
+++ b/client.go
@@ -31,6 +31,7 @@ import (
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/anacrolix/sync"
+       "github.com/anacrolix/torrent/types/infohash"
        "github.com/davecgh/go-spew/spew"
        "github.com/dustin/go-humanize"
        gbtree "github.com/google/btree"
@@ -1286,7 +1287,7 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
 }
 
 type AddTorrentOpts struct {
-       InfoHash  InfoHash
+       InfoHash  infohash.T
        Storage   storage.ClientImpl
        ChunkSize pp.Integer
 }
@@ -1505,6 +1506,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon
                connString: opts.connString,
                conn:       nc,
        }
+       c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
        c.initRequestState()
        // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
        if opts.remoteAddr != nil {
index e1e6452af1699ac8c509f8d25b45846d74096908..1764168972e59fb4e6adad67d33cf23d2566245e 100644 (file)
--- a/config.go
+++ b/config.go
@@ -148,6 +148,8 @@ type ClientConfig struct {
        // How long between writes before sending a keep alive message on a peer connection that we want
        // to maintain.
        KeepAliveTimeout time.Duration
+       // Maximum bytes to buffer per peer connection for peer request data before it is sent.
+       MaxAllocPeerRequestDataPerConn int64
 
        // The IP addresses as our peers should see them. May differ from the
        // local interfaces due to NAT or other network configurations.
@@ -205,6 +207,7 @@ func NewDefaultClientConfig() *ClientConfig {
                TorrentPeersLowWater:           50,
                HandshakesTimeout:              4 * time.Second,
                KeepAliveTimeout:               time.Minute,
+               MaxAllocPeerRequestDataPerConn: 1 << 20,
                ListenHost:                     func(string) string { return "" },
                UploadRateLimiter:              unlimited,
                DownloadRateLimiter:            unlimited,
diff --git a/internal/alloclim/alloclim_test.go b/internal/alloclim/alloclim_test.go
new file mode 100644 (file)
index 0000000..5952804
--- /dev/null
@@ -0,0 +1,93 @@
+package alloclim
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       _ "github.com/anacrolix/envpprof"
+       qt "github.com/frankban/quicktest"
+)
+
+func TestReserveOverMax(t *testing.T) {
+       c := qt.New(t)
+       l := &Limiter{Max: 10}
+       r := l.Reserve(20)
+       c.Assert(r.Wait(context.Background()), qt.IsNotNil)
+}
+
+func TestImmediateAllow(t *testing.T) {
+       c := qt.New(t)
+       l := &Limiter{Max: 10}
+       r := l.Reserve(10)
+       c.Assert(r.Wait(context.Background()), qt.IsNil)
+}
+
+func TestSimpleSequence(t *testing.T) {
+       c := qt.New(t)
+       l := &Limiter{Max: 10}
+       rs := make([]*Reservation, 0)
+       rs = append(rs, l.Reserve(6))
+       rs = append(rs, l.Reserve(5))
+       rs = append(rs, l.Reserve(5))
+       c.Assert(rs[0].Wait(context.Background()), qt.IsNil)
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Nanosecond))
+       c.Assert(rs[1].Wait(ctx), qt.Equals, context.DeadlineExceeded)
+       go cancel()
+       ctx, cancel = context.WithCancel(context.Background())
+       go cancel()
+       c.Assert(rs[2].Wait(ctx), qt.Equals, context.Canceled)
+       go rs[0].Release()
+       ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       c.Assert(rs[1].Wait(ctx), qt.IsNil)
+       go rs[1].Release()
+       c.Assert(rs[2].Wait(ctx), qt.IsNil)
+       go rs[2].Release()
+       go cancel()
+       rs[2].Release()
+       rs[1].Release()
+       c.Assert(l.Value(), qt.Equals, l.Max)
+}
+
+func TestSequenceWithCancel(t *testing.T) {
+       c := qt.New(t)
+       l := &Limiter{Max: 10}
+       rs := make([]*Reservation, 0)
+       rs = append(rs, l.Reserve(6))
+       rs = append(rs, l.Reserve(6))
+       rs = append(rs, l.Reserve(4))
+       rs = append(rs, l.Reserve(4))
+       c.Assert(rs[0].Cancel(), qt.IsFalse)
+       c.Assert(func() { rs[1].Release() }, qt.PanicMatches, "not resolved")
+       c.Assert(rs[1].Cancel(), qt.IsTrue)
+       c.Assert(rs[2].Wait(context.Background()), qt.IsNil)
+       rs[0].Release()
+       c.Assert(rs[3].Wait(context.Background()), qt.IsNil)
+       c.Assert(l.Value(), qt.Equals, int64(2))
+       rs[1].Release()
+       rs[2].Release()
+       rs[3].Release()
+       c.Assert(l.Value(), qt.Equals, l.Max)
+}
+
+func TestCancelWhileWaiting(t *testing.T) {
+       c := qt.New(t)
+       l := &Limiter{Max: 10}
+       rs := make([]*Reservation, 0)
+       rs = append(rs, l.Reserve(6))
+       rs = append(rs, l.Reserve(6))
+       rs = append(rs, l.Reserve(4))
+       rs = append(rs, l.Reserve(4))
+       go rs[1].Cancel()
+       err := rs[1].Wait(context.Background())
+       c.Assert(err, qt.IsNotNil)
+       err = rs[2].Wait(context.Background())
+       c.Assert(err, qt.IsNil)
+       ctx, cancel := context.WithCancel(context.Background())
+       go cancel()
+       err = rs[3].Wait(ctx)
+       c.Assert(err, qt.Equals, context.Canceled)
+       rs[0].Drop()
+       err = rs[3].Wait(ctx)
+       c.Assert(err, qt.IsNil)
+}
diff --git a/internal/alloclim/l.go b/internal/alloclim/l.go
new file mode 100644 (file)
index 0000000..98be1a1
--- /dev/null
@@ -0,0 +1,80 @@
+package alloclim
+
+import "sync"
+
+// Manages reservations sharing a common allocation limit.
+type Limiter struct {
+       // Maximum outstanding allocation space.
+       Max      int64
+       initOnce sync.Once
+       mu       sync.Mutex
+       // Current unallocated space.
+       value int64
+       // Reservations waiting to in the order they arrived.
+       waiting []*Reservation
+}
+
+func (me *Limiter) initValue() {
+       me.value = me.Max
+}
+
+func (me *Limiter) init() {
+       me.initOnce.Do(func() {
+               me.initValue()
+       })
+}
+
+func (me *Limiter) Reserve(n int64) *Reservation {
+       r := &Reservation{
+               l: me,
+               n: n,
+       }
+       me.init()
+       me.mu.Lock()
+       if n <= me.value {
+               me.value -= n
+               r.granted.Set()
+       } else {
+               me.waiting = append(me.waiting, r)
+       }
+       me.mu.Unlock()
+       return r
+}
+
+func (me *Limiter) doWakesLocked() {
+       for {
+               if len(me.waiting) == 0 {
+                       break
+               }
+               r := me.waiting[0]
+               switch {
+               case r.cancelled.IsSet():
+               case r.n <= me.value:
+                       if r.wake() {
+                               me.value -= r.n
+                       }
+               default:
+                       return
+               }
+               me.waiting = me.waiting[1:]
+       }
+}
+
+func (me *Limiter) doWakes() {
+       me.mu.Lock()
+       me.doWakesLocked()
+       me.mu.Unlock()
+}
+
+func (me *Limiter) addValue(n int64) {
+       me.mu.Lock()
+       me.value += n
+       me.doWakesLocked()
+       me.mu.Unlock()
+}
+
+func (me *Limiter) Value() int64 {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       return me.value
+}
diff --git a/internal/alloclim/r.go b/internal/alloclim/r.go
new file mode 100644 (file)
index 0000000..b84be66
--- /dev/null
@@ -0,0 +1,97 @@
+package alloclim
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "sync"
+
+       "github.com/anacrolix/chansync"
+)
+
+type Reservation struct {
+       l           *Limiter
+       n           int64
+       releaseOnce sync.Once
+       mu          sync.Mutex
+       granted     chansync.SetOnce
+       cancelled   chansync.SetOnce
+}
+
+// Releases the alloc claim if the reservation has been granted. Does nothing if it was cancelled.
+// Otherwise panics.
+func (me *Reservation) Release() {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       switch {
+       default:
+               panic("not resolved")
+       case me.cancelled.IsSet():
+               return
+       case me.granted.IsSet():
+       }
+       me.releaseOnce.Do(func() {
+               me.l.addValue(me.n)
+       })
+}
+
+// Cancel the reservation, returns false if it was already granted. You must still release if that's
+// the case. See Drop.
+func (me *Reservation) Cancel() bool {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       if me.granted.IsSet() {
+               return false
+       }
+       if me.cancelled.Set() {
+               go me.l.doWakes()
+       }
+       return true
+}
+
+// If the reservation is granted, release it, otherwise cancel the reservation.
+func (me *Reservation) Drop() {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       if me.granted.IsSet() {
+               me.releaseOnce.Do(func() {
+                       me.l.addValue(me.n)
+               })
+               return
+       }
+       if me.cancelled.Set() {
+               go me.l.doWakes()
+       }
+}
+
+func (me *Reservation) wake() bool {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       if me.cancelled.IsSet() {
+               return false
+       }
+       return me.granted.Set()
+}
+
+func (me *Reservation) Wait(ctx context.Context) error {
+       if me.n > me.l.Max {
+               return fmt.Errorf("reservation for %v exceeds limiter max %v", me.n, me.l.Max)
+       }
+       select {
+       case <-ctx.Done():
+       case <-me.granted.Done():
+       case <-me.cancelled.Done():
+       }
+       defer me.mu.Unlock()
+       me.mu.Lock()
+       switch {
+       case me.granted.IsSet():
+               return nil
+       case me.cancelled.IsSet():
+               return errors.New("reservation cancelled")
+       case ctx.Err() != nil:
+               return ctx.Err()
+       default:
+               panic("unexpected")
+       }
+}
index fdc8236d600f726fef653f051c27a8160859410d..af9c084410e758f562fe4eaa236ab621d6952a1f 100644 (file)
@@ -3,6 +3,7 @@ package torrent
 import (
        "bufio"
        "bytes"
+       "context"
        "errors"
        "fmt"
        "io"
@@ -20,6 +21,7 @@ import (
        "github.com/anacrolix/missinggo/iter"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
+       "github.com/anacrolix/torrent/internal/alloclim"
        "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/bencode"
@@ -43,7 +45,8 @@ const (
 )
 
 type peerRequestState struct {
-       data []byte
+       data             []byte
+       allocReservation *alloclim.Reservation
 }
 
 type PeerRemoteAddr interface {
@@ -169,6 +172,8 @@ type PeerConn struct {
        // The peer has everything. This can occur due to a special message, when
        // we may not even know the number of pieces in the torrent yet.
        peerSentHaveAll bool
+
+       peerRequestDataAllocLimiter alloclim.Limiter
 }
 
 func (cn *PeerConn) peerImplStatusLines() []string {
@@ -547,11 +552,18 @@ func (cn *PeerConn) choke(msg messageWriter) (more bool) {
                Type: pp.Choke,
        })
        if !cn.fastEnabled() {
-               cn.peerRequests = nil
+               cn.deleteAllPeerRequests()
        }
        return
 }
 
+func (cn *PeerConn) deleteAllPeerRequests() {
+       for _, state := range cn.peerRequests {
+               state.allocReservation.Drop()
+       }
+       cn.peerRequests = nil
+}
+
 func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
        if !cn.choking {
                return true
@@ -992,7 +1004,11 @@ func (c *PeerConn) reject(r Request) {
                panic("fast not enabled")
        }
        c.write(r.ToMsg(pp.Reject))
-       delete(c.peerRequests, r)
+       // It is possible to reject a request before it is added to peer requests due to being invalid.
+       if state, ok := c.peerRequests[r]; ok {
+               state.allocReservation.Drop()
+               delete(c.peerRequests, r)
+       }
 }
 
 func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
@@ -1066,7 +1082,9 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
        if c.peerRequests == nil {
                c.peerRequests = make(map[Request]*peerRequestState, localClientReqq)
        }
-       value := &peerRequestState{}
+       value := &peerRequestState{
+               allocReservation: c.peerRequestDataAllocLimiter.Reserve(int64(r.Length)),
+       }
        c.peerRequests[r] = value
        if startFetch {
                // TODO: Limit peer request data read concurrency.
@@ -1076,7 +1094,7 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
 }
 
 func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
-       b, err := readPeerRequestData(r, c)
+       b, err := c.readPeerRequestData(r, prs)
        c.locker().Lock()
        defer c.locker().Unlock()
        if err != nil {
@@ -1133,7 +1151,20 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
        }
 }
 
-func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {
+func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) {
+       // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
+       // or fail to read and then cleanup.
+       ctx := context.Background()
+       err := prs.allocReservation.Wait(ctx)
+       if err != nil {
+               if ctx.Err() == nil {
+                       // The error is from the reservation itself. Something is very broken, or we're not
+                       // guarding against excessively large requests.
+                       err = log.WithLevel(log.Critical, err)
+               }
+               err = fmt.Errorf("waiting for alloc limit reservation: %w", err)
+               return nil, err
+       }
        b := make([]byte, r.Length)
        p := c.t.info.Piece(int(r.Index))
        n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
@@ -1740,6 +1771,7 @@ func (c *PeerConn) tickleWriter() {
 
 func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
        c.lastChunkSent = time.Now()
+       state.allocReservation.Release()
        return msg(pp.Message{
                Type:  pp.Piece,
                Index: r.Index,
index eac152d6605be04daa6007dba76ca9a354db3c19..bfb3816db94c6281b1adf280f59de28b7e94e595 100644 (file)
@@ -108,6 +108,8 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
        cfg := torrent.TestingConfig(t)
        // cfg.Debug = true
        cfg.Seed = true
+       // Less than a piece, more than a single request.
+       cfg.MaxAllocPeerRequestDataPerConn = 4
        // Some test instances don't like this being on, even when there's no cache involved.
        cfg.DropMutuallyCompletePeers = false
        if ps.SeederUploadRateLimiter != nil {
index 2c77e673f0f9c96c196695bed784bdf602c99701..b96c94d3298d931fcb5f0768373ef50b97119c54 100644 (file)
@@ -138,6 +138,7 @@ func TestSeedAfterDownloading(t *testing.T) {
 
        cfg := torrent.TestingConfig(t)
        cfg.Seed = true
+       cfg.MaxAllocPeerRequestDataPerConn = 4
        cfg.DataDir = greetingTempDir
        seeder, err := torrent.NewClient(cfg)
        require.NoError(t, err)
@@ -159,6 +160,7 @@ func TestSeedAfterDownloading(t *testing.T) {
        cfg = torrent.TestingConfig(t)
        cfg.Seed = false
        cfg.DataDir = t.TempDir()
+       cfg.MaxAllocPeerRequestDataPerConn = 4
        leecherLeecher, _ := torrent.NewClient(cfg)
        require.NoError(t, err)
        defer leecherLeecher.Close()