From d4cbdc5c38ac835ceedc5e477bfc335f3e71652f Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 10 Oct 2016 17:29:39 +1100 Subject: [PATCH] Add download rate limiting Fixes #121. --- client.go | 11 +++++++++-- client_test.go | 24 ++++++++++++++++-------- cmd/torrent/main.go | 22 +++++++++++++++++----- config.go | 7 ++++++- ratelimitreader.go | 25 +++++++++++++++++++++++++ 5 files changed, 73 insertions(+), 16 deletions(-) create mode 100644 ratelimitreader.go diff --git a/client.go b/client.go index 51aaf193..d28f9b9d 100644 --- a/client.go +++ b/client.go @@ -74,8 +74,9 @@ type Client struct { // Our BitTorrent protocol extension bytes, sent in our BT handshakes. extensionBytes peerExtensionBytes // The net.Addr.String part that should be common to all active listeners. - listenAddr string - uploadLimit *rate.Limiter + listenAddr string + uploadLimit *rate.Limiter + downloadLimit *rate.Limiter // Set of addresses that have our client ID. This intentionally will // include ourselves if we end up trying to connect to our own address @@ -263,6 +264,11 @@ func NewClient(cfg *Config) (cl *Client, err error) { } else { cl.uploadLimit = cfg.UploadRateLimiter } + if cfg.DownloadRateLimiter == nil { + cl.downloadLimit = rate.NewLimiter(rate.Inf, 0) + } else { + cl.downloadLimit = cfg.DownloadRateLimiter + } missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes) cl.event.L = &cl.mu storageImpl := cfg.DefaultStorage @@ -1583,5 +1589,6 @@ func (cl *Client) newConnection(nc net.Conn) (c *connection) { PeerMaxRequests: 250, } c.setRW(connStatsReadWriter{nc, &cl.mu, c}) + c.r = rateLimitedReader{cl.downloadLimit, c.r} return } diff --git a/client_test.go b/client_test.go index 20aa96f5..0835a976 100644 --- a/client_test.go +++ b/client_test.go @@ -271,7 +271,7 @@ func TestClientTransferDefault(t *testing.T) { }) } -func TestClientTransferRateLimited(t *testing.T) { +func TestClientTransferRateLimitedUpload(t *testing.T) { started := time.Now() testClientTransfer(t, testClientTransferParams{ // We are uploading 13 bytes (the length of the greeting torrent). The @@ -282,6 +282,12 @@ func TestClientTransferRateLimited(t *testing.T) { require.True(t, time.Since(started) > time.Second) } +func TestClientTransferRateLimitedDownload(t *testing.T) { + testClientTransfer(t, testClientTransferParams{ + LeecherDownloadRateLimiter: rate.NewLimiter(512, 512), + }) +} + func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl { return storage.NewResourcePieces(fc.AsResourceProvider()) } @@ -344,13 +350,14 @@ func TestClientTransferVarious(t *testing.T) { } type testClientTransferParams struct { - Responsive bool - Readahead int64 - SetReadahead bool - ExportClientStatus bool - LeecherStorage func(string) storage.ClientImpl - SeederStorage func(string) storage.ClientImpl - SeederUploadRateLimiter *rate.Limiter + Responsive bool + Readahead int64 + SetReadahead bool + ExportClientStatus bool + LeecherStorage func(string) storage.ClientImpl + SeederStorage func(string) storage.ClientImpl + SeederUploadRateLimiter *rate.Limiter + LeecherDownloadRateLimiter *rate.Limiter } // Creates a seeder and a leecher, and ensures the data transfers when a read @@ -387,6 +394,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { } else { cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir) } + cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter // cfg.ListenAddr = "localhost:4001" leecher, err := NewClient(&cfg) require.NoError(t, err) diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index f27e5724..858c8054 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -14,6 +14,7 @@ import ( "github.com/anacrolix/tagflag" "github.com/dustin/go-humanize" "github.com/gosuri/uiprogress" + "golang.org/x/time/rate" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/metainfo" @@ -110,13 +111,18 @@ func addTorrents(client *torrent.Client) { } } -var flags struct { - Mmap bool `help:"memory-map torrent data"` - TestPeer []*net.TCPAddr `help:"addresses of some starting peers"` - Seed bool `help:"seed after download is complete"` - Addr *net.TCPAddr `help:"network listen addr"` +var flags = struct { + Mmap bool `help:"memory-map torrent data"` + TestPeer []*net.TCPAddr `help:"addresses of some starting peers"` + Seed bool `help:"seed after download is complete"` + Addr *net.TCPAddr `help:"network listen addr"` + UploadRate tagflag.Bytes `help:"max piece bytes to send per second"` + DownloadRate tagflag.Bytes `help:"max bytes per second down from peers"` tagflag.StartPos Torrent []string `arity:"+" help:"torrent file path or magnet uri"` +}{ + UploadRate: -1, + DownloadRate: -1, } func main() { @@ -132,6 +138,12 @@ func main() { if flags.Seed { clientConfig.Seed = true } + if flags.UploadRate != -1 { + clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(flags.UploadRate), 256<<10) + } + if flags.DownloadRate != -1 { + clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(flags.DownloadRate), 1<<20) + } client, err := torrent.NewClient(&clientConfig) if err != nil { diff --git a/config.go b/config.go index 5e82ff67..11315b0e 100644 --- a/config.go +++ b/config.go @@ -33,7 +33,12 @@ type Config struct { Seed bool `long:"seed"` // Events are data bytes sent in pieces. The burst must be large enough to // fit a whole chunk. - UploadRateLimiter *rate.Limiter + UploadRateLimiter *rate.Limiter + // The events are bytes read from connections. The burst must be bigger + // than the largest Read performed on a Conn minus one. This is likely to + // be the larger of the main read loop buffer (~4096), and the requested + // chunk size (~16KiB). + DownloadRateLimiter *rate.Limiter // User-provided Client peer ID. If not present, one is generated automatically. PeerID string diff --git a/ratelimitreader.go b/ratelimitreader.go new file mode 100644 index 00000000..236efd4d --- /dev/null +++ b/ratelimitreader.go @@ -0,0 +1,25 @@ +package torrent + +import ( + "context" + "io" + "time" + + "golang.org/x/time/rate" +) + +type rateLimitedReader struct { + l *rate.Limiter + r io.Reader +} + +func (me rateLimitedReader) Read(b []byte) (n int, err error) { + if err := me.l.WaitN(context.Background(), 1); err != nil { + panic(err) + } + n, err = me.r.Read(b) + if !me.l.ReserveN(time.Now(), n-1).OK() { + panic(n - 1) + } + return +} -- 2.44.0