"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
+ "github.com/anacrolix/torrent/types/infohash"
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
gbtree "github.com/google/btree"
}
type AddTorrentOpts struct {
- InfoHash InfoHash
+ InfoHash infohash.T
Storage storage.ClientImpl
ChunkSize pp.Integer
}
connString: opts.connString,
conn: nc,
}
+ c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
c.initRequestState()
// TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
if opts.remoteAddr != nil {
// How long between writes before sending a keep alive message on a peer connection that we want
// to maintain.
KeepAliveTimeout time.Duration
+ // Maximum bytes to buffer per peer connection for peer request data before it is sent.
+ MaxAllocPeerRequestDataPerConn int64
// The IP addresses as our peers should see them. May differ from the
// local interfaces due to NAT or other network configurations.
TorrentPeersLowWater: 50,
HandshakesTimeout: 4 * time.Second,
KeepAliveTimeout: time.Minute,
+ MaxAllocPeerRequestDataPerConn: 1 << 20,
ListenHost: func(string) string { return "" },
UploadRateLimiter: unlimited,
DownloadRateLimiter: unlimited,
--- /dev/null
+package alloclim
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ _ "github.com/anacrolix/envpprof"
+ qt "github.com/frankban/quicktest"
+)
+
+func TestReserveOverMax(t *testing.T) {
+ c := qt.New(t)
+ l := &Limiter{Max: 10}
+ r := l.Reserve(20)
+ c.Assert(r.Wait(context.Background()), qt.IsNotNil)
+}
+
+func TestImmediateAllow(t *testing.T) {
+ c := qt.New(t)
+ l := &Limiter{Max: 10}
+ r := l.Reserve(10)
+ c.Assert(r.Wait(context.Background()), qt.IsNil)
+}
+
+func TestSimpleSequence(t *testing.T) {
+ c := qt.New(t)
+ l := &Limiter{Max: 10}
+ rs := make([]*Reservation, 0)
+ rs = append(rs, l.Reserve(6))
+ rs = append(rs, l.Reserve(5))
+ rs = append(rs, l.Reserve(5))
+ c.Assert(rs[0].Wait(context.Background()), qt.IsNil)
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Nanosecond))
+ c.Assert(rs[1].Wait(ctx), qt.Equals, context.DeadlineExceeded)
+ go cancel()
+ ctx, cancel = context.WithCancel(context.Background())
+ go cancel()
+ c.Assert(rs[2].Wait(ctx), qt.Equals, context.Canceled)
+ go rs[0].Release()
+ ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ c.Assert(rs[1].Wait(ctx), qt.IsNil)
+ go rs[1].Release()
+ c.Assert(rs[2].Wait(ctx), qt.IsNil)
+ go rs[2].Release()
+ go cancel()
+ rs[2].Release()
+ rs[1].Release()
+ c.Assert(l.Value(), qt.Equals, l.Max)
+}
+
+func TestSequenceWithCancel(t *testing.T) {
+ c := qt.New(t)
+ l := &Limiter{Max: 10}
+ rs := make([]*Reservation, 0)
+ rs = append(rs, l.Reserve(6))
+ rs = append(rs, l.Reserve(6))
+ rs = append(rs, l.Reserve(4))
+ rs = append(rs, l.Reserve(4))
+ c.Assert(rs[0].Cancel(), qt.IsFalse)
+ c.Assert(func() { rs[1].Release() }, qt.PanicMatches, "not resolved")
+ c.Assert(rs[1].Cancel(), qt.IsTrue)
+ c.Assert(rs[2].Wait(context.Background()), qt.IsNil)
+ rs[0].Release()
+ c.Assert(rs[3].Wait(context.Background()), qt.IsNil)
+ c.Assert(l.Value(), qt.Equals, int64(2))
+ rs[1].Release()
+ rs[2].Release()
+ rs[3].Release()
+ c.Assert(l.Value(), qt.Equals, l.Max)
+}
+
+func TestCancelWhileWaiting(t *testing.T) {
+ c := qt.New(t)
+ l := &Limiter{Max: 10}
+ rs := make([]*Reservation, 0)
+ rs = append(rs, l.Reserve(6))
+ rs = append(rs, l.Reserve(6))
+ rs = append(rs, l.Reserve(4))
+ rs = append(rs, l.Reserve(4))
+ go rs[1].Cancel()
+ err := rs[1].Wait(context.Background())
+ c.Assert(err, qt.IsNotNil)
+ err = rs[2].Wait(context.Background())
+ c.Assert(err, qt.IsNil)
+ ctx, cancel := context.WithCancel(context.Background())
+ go cancel()
+ err = rs[3].Wait(ctx)
+ c.Assert(err, qt.Equals, context.Canceled)
+ rs[0].Drop()
+ err = rs[3].Wait(ctx)
+ c.Assert(err, qt.IsNil)
+}
--- /dev/null
+package alloclim
+
+import "sync"
+
+// Manages reservations sharing a common allocation limit.
+type Limiter struct {
+ // Maximum outstanding allocation space.
+ Max int64
+ initOnce sync.Once
+ mu sync.Mutex
+ // Current unallocated space.
+ value int64
+ // Reservations waiting to in the order they arrived.
+ waiting []*Reservation
+}
+
+func (me *Limiter) initValue() {
+ me.value = me.Max
+}
+
+func (me *Limiter) init() {
+ me.initOnce.Do(func() {
+ me.initValue()
+ })
+}
+
+func (me *Limiter) Reserve(n int64) *Reservation {
+ r := &Reservation{
+ l: me,
+ n: n,
+ }
+ me.init()
+ me.mu.Lock()
+ if n <= me.value {
+ me.value -= n
+ r.granted.Set()
+ } else {
+ me.waiting = append(me.waiting, r)
+ }
+ me.mu.Unlock()
+ return r
+}
+
+func (me *Limiter) doWakesLocked() {
+ for {
+ if len(me.waiting) == 0 {
+ break
+ }
+ r := me.waiting[0]
+ switch {
+ case r.cancelled.IsSet():
+ case r.n <= me.value:
+ if r.wake() {
+ me.value -= r.n
+ }
+ default:
+ return
+ }
+ me.waiting = me.waiting[1:]
+ }
+}
+
+func (me *Limiter) doWakes() {
+ me.mu.Lock()
+ me.doWakesLocked()
+ me.mu.Unlock()
+}
+
+func (me *Limiter) addValue(n int64) {
+ me.mu.Lock()
+ me.value += n
+ me.doWakesLocked()
+ me.mu.Unlock()
+}
+
+func (me *Limiter) Value() int64 {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ return me.value
+}
--- /dev/null
+package alloclim
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+
+ "github.com/anacrolix/chansync"
+)
+
+type Reservation struct {
+ l *Limiter
+ n int64
+ releaseOnce sync.Once
+ mu sync.Mutex
+ granted chansync.SetOnce
+ cancelled chansync.SetOnce
+}
+
+// Releases the alloc claim if the reservation has been granted. Does nothing if it was cancelled.
+// Otherwise panics.
+func (me *Reservation) Release() {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ switch {
+ default:
+ panic("not resolved")
+ case me.cancelled.IsSet():
+ return
+ case me.granted.IsSet():
+ }
+ me.releaseOnce.Do(func() {
+ me.l.addValue(me.n)
+ })
+}
+
+// Cancel the reservation, returns false if it was already granted. You must still release if that's
+// the case. See Drop.
+func (me *Reservation) Cancel() bool {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ if me.granted.IsSet() {
+ return false
+ }
+ if me.cancelled.Set() {
+ go me.l.doWakes()
+ }
+ return true
+}
+
+// If the reservation is granted, release it, otherwise cancel the reservation.
+func (me *Reservation) Drop() {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ if me.granted.IsSet() {
+ me.releaseOnce.Do(func() {
+ me.l.addValue(me.n)
+ })
+ return
+ }
+ if me.cancelled.Set() {
+ go me.l.doWakes()
+ }
+}
+
+func (me *Reservation) wake() bool {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ if me.cancelled.IsSet() {
+ return false
+ }
+ return me.granted.Set()
+}
+
+func (me *Reservation) Wait(ctx context.Context) error {
+ if me.n > me.l.Max {
+ return fmt.Errorf("reservation for %v exceeds limiter max %v", me.n, me.l.Max)
+ }
+ select {
+ case <-ctx.Done():
+ case <-me.granted.Done():
+ case <-me.cancelled.Done():
+ }
+ defer me.mu.Unlock()
+ me.mu.Lock()
+ switch {
+ case me.granted.IsSet():
+ return nil
+ case me.cancelled.IsSet():
+ return errors.New("reservation cancelled")
+ case ctx.Err() != nil:
+ return ctx.Err()
+ default:
+ panic("unexpected")
+ }
+}
import (
"bufio"
"bytes"
+ "context"
"errors"
"fmt"
"io"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
+ "github.com/anacrolix/torrent/internal/alloclim"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
)
type peerRequestState struct {
- data []byte
+ data []byte
+ allocReservation *alloclim.Reservation
}
type PeerRemoteAddr interface {
// The peer has everything. This can occur due to a special message, when
// we may not even know the number of pieces in the torrent yet.
peerSentHaveAll bool
+
+ peerRequestDataAllocLimiter alloclim.Limiter
}
func (cn *PeerConn) peerImplStatusLines() []string {
Type: pp.Choke,
})
if !cn.fastEnabled() {
- cn.peerRequests = nil
+ cn.deleteAllPeerRequests()
}
return
}
+func (cn *PeerConn) deleteAllPeerRequests() {
+ for _, state := range cn.peerRequests {
+ state.allocReservation.Drop()
+ }
+ cn.peerRequests = nil
+}
+
func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
if !cn.choking {
return true
panic("fast not enabled")
}
c.write(r.ToMsg(pp.Reject))
- delete(c.peerRequests, r)
+ // It is possible to reject a request before it is added to peer requests due to being invalid.
+ if state, ok := c.peerRequests[r]; ok {
+ state.allocReservation.Drop()
+ delete(c.peerRequests, r)
+ }
}
func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
if c.peerRequests == nil {
c.peerRequests = make(map[Request]*peerRequestState, localClientReqq)
}
- value := &peerRequestState{}
+ value := &peerRequestState{
+ allocReservation: c.peerRequestDataAllocLimiter.Reserve(int64(r.Length)),
+ }
c.peerRequests[r] = value
if startFetch {
// TODO: Limit peer request data read concurrency.
}
func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
- b, err := readPeerRequestData(r, c)
+ b, err := c.readPeerRequestData(r, prs)
c.locker().Lock()
defer c.locker().Unlock()
if err != nil {
}
}
-func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {
+func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) {
+ // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
+ // or fail to read and then cleanup.
+ ctx := context.Background()
+ err := prs.allocReservation.Wait(ctx)
+ if err != nil {
+ if ctx.Err() == nil {
+ // The error is from the reservation itself. Something is very broken, or we're not
+ // guarding against excessively large requests.
+ err = log.WithLevel(log.Critical, err)
+ }
+ err = fmt.Errorf("waiting for alloc limit reservation: %w", err)
+ return nil, err
+ }
b := make([]byte, r.Length)
p := c.t.info.Piece(int(r.Index))
n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
c.lastChunkSent = time.Now()
+ state.allocReservation.Release()
return msg(pp.Message{
Type: pp.Piece,
Index: r.Index,
cfg := torrent.TestingConfig(t)
// cfg.Debug = true
cfg.Seed = true
+ // Less than a piece, more than a single request.
+ cfg.MaxAllocPeerRequestDataPerConn = 4
// Some test instances don't like this being on, even when there's no cache involved.
cfg.DropMutuallyCompletePeers = false
if ps.SeederUploadRateLimiter != nil {
cfg := torrent.TestingConfig(t)
cfg.Seed = true
+ cfg.MaxAllocPeerRequestDataPerConn = 4
cfg.DataDir = greetingTempDir
seeder, err := torrent.NewClient(cfg)
require.NoError(t, err)
cfg = torrent.TestingConfig(t)
cfg.Seed = false
cfg.DataDir = t.TempDir()
+ cfg.MaxAllocPeerRequestDataPerConn = 4
leecherLeecher, _ := torrent.NewClient(cfg)
require.NoError(t, err)
defer leecherLeecher.Close()