]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add download rate limiting
authorMatt Joiner <anacrolix@gmail.com>
Mon, 10 Oct 2016 06:29:39 +0000 (17:29 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 10 Oct 2016 06:29:39 +0000 (17:29 +1100)
Fixes #121.

client.go
client_test.go
cmd/torrent/main.go
config.go
ratelimitreader.go [new file with mode: 0644]

index 51aaf193238a6504e4b87ffeadf7fd9618a087fb..d28f9b9db784e080eaeafb068081182f3d836108 100644 (file)
--- a/client.go
+++ b/client.go
@@ -74,8 +74,9 @@ type Client struct {
        // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
        extensionBytes peerExtensionBytes
        // The net.Addr.String part that should be common to all active listeners.
-       listenAddr  string
-       uploadLimit *rate.Limiter
+       listenAddr    string
+       uploadLimit   *rate.Limiter
+       downloadLimit *rate.Limiter
 
        // Set of addresses that have our client ID. This intentionally will
        // include ourselves if we end up trying to connect to our own address
@@ -263,6 +264,11 @@ func NewClient(cfg *Config) (cl *Client, err error) {
        } else {
                cl.uploadLimit = cfg.UploadRateLimiter
        }
+       if cfg.DownloadRateLimiter == nil {
+               cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
+       } else {
+               cl.downloadLimit = cfg.DownloadRateLimiter
+       }
        missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
        cl.event.L = &cl.mu
        storageImpl := cfg.DefaultStorage
@@ -1583,5 +1589,6 @@ func (cl *Client) newConnection(nc net.Conn) (c *connection) {
                PeerMaxRequests: 250,
        }
        c.setRW(connStatsReadWriter{nc, &cl.mu, c})
+       c.r = rateLimitedReader{cl.downloadLimit, c.r}
        return
 }
index 20aa96f58552e13708138b94aa9107fc02ea4496..0835a9760ea499e7325c01dfb180bc76b03a6d80 100644 (file)
@@ -271,7 +271,7 @@ func TestClientTransferDefault(t *testing.T) {
        })
 }
 
-func TestClientTransferRateLimited(t *testing.T) {
+func TestClientTransferRateLimitedUpload(t *testing.T) {
        started := time.Now()
        testClientTransfer(t, testClientTransferParams{
                // We are uploading 13 bytes (the length of the greeting torrent). The
@@ -282,6 +282,12 @@ func TestClientTransferRateLimited(t *testing.T) {
        require.True(t, time.Since(started) > time.Second)
 }
 
+func TestClientTransferRateLimitedDownload(t *testing.T) {
+       testClientTransfer(t, testClientTransferParams{
+               LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
+       })
+}
+
 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
        return storage.NewResourcePieces(fc.AsResourceProvider())
 }
@@ -344,13 +350,14 @@ func TestClientTransferVarious(t *testing.T) {
 }
 
 type testClientTransferParams struct {
-       Responsive              bool
-       Readahead               int64
-       SetReadahead            bool
-       ExportClientStatus      bool
-       LeecherStorage          func(string) storage.ClientImpl
-       SeederStorage           func(string) storage.ClientImpl
-       SeederUploadRateLimiter *rate.Limiter
+       Responsive                 bool
+       Readahead                  int64
+       SetReadahead               bool
+       ExportClientStatus         bool
+       LeecherStorage             func(string) storage.ClientImpl
+       SeederStorage              func(string) storage.ClientImpl
+       SeederUploadRateLimiter    *rate.Limiter
+       LeecherDownloadRateLimiter *rate.Limiter
 }
 
 // Creates a seeder and a leecher, and ensures the data transfers when a read
@@ -387,6 +394,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
        } else {
                cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
        }
+       cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
        // cfg.ListenAddr = "localhost:4001"
        leecher, err := NewClient(&cfg)
        require.NoError(t, err)
index f27e57247c38695928d8a3c1a0b3c157463b21e6..858c8054aeef0d7e295d05e4438d25980b63a010 100644 (file)
@@ -14,6 +14,7 @@ import (
        "github.com/anacrolix/tagflag"
        "github.com/dustin/go-humanize"
        "github.com/gosuri/uiprogress"
+       "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent"
        "github.com/anacrolix/torrent/metainfo"
@@ -110,13 +111,18 @@ func addTorrents(client *torrent.Client) {
        }
 }
 
-var flags struct {
-       Mmap     bool           `help:"memory-map torrent data"`
-       TestPeer []*net.TCPAddr `help:"addresses of some starting peers"`
-       Seed     bool           `help:"seed after download is complete"`
-       Addr     *net.TCPAddr   `help:"network listen addr"`
+var flags = struct {
+       Mmap         bool           `help:"memory-map torrent data"`
+       TestPeer     []*net.TCPAddr `help:"addresses of some starting peers"`
+       Seed         bool           `help:"seed after download is complete"`
+       Addr         *net.TCPAddr   `help:"network listen addr"`
+       UploadRate   tagflag.Bytes  `help:"max piece bytes to send per second"`
+       DownloadRate tagflag.Bytes  `help:"max bytes per second down from peers"`
        tagflag.StartPos
        Torrent []string `arity:"+" help:"torrent file path or magnet uri"`
+}{
+       UploadRate:   -1,
+       DownloadRate: -1,
 }
 
 func main() {
@@ -132,6 +138,12 @@ func main() {
        if flags.Seed {
                clientConfig.Seed = true
        }
+       if flags.UploadRate != -1 {
+               clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(flags.UploadRate), 256<<10)
+       }
+       if flags.DownloadRate != -1 {
+               clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(flags.DownloadRate), 1<<20)
+       }
 
        client, err := torrent.NewClient(&clientConfig)
        if err != nil {
index 5e82ff67a5bf15770423567051e4e13e2ee8e30e..11315b0ecbd9cd4f6e956d3b72a6d352d0752c78 100644 (file)
--- a/config.go
+++ b/config.go
@@ -33,7 +33,12 @@ type Config struct {
        Seed bool `long:"seed"`
        // Events are data bytes sent in pieces. The burst must be large enough to
        // fit a whole chunk.
-       UploadRateLimiter   *rate.Limiter
+       UploadRateLimiter *rate.Limiter
+       // The events are bytes read from connections. The burst must be bigger
+       // than the largest Read performed on a Conn minus one. This is likely to
+       // be the larger of the main read loop buffer (~4096), and the requested
+       // chunk size (~16KiB).
+       DownloadRateLimiter *rate.Limiter
 
        // User-provided Client peer ID. If not present, one is generated automatically.
        PeerID string
diff --git a/ratelimitreader.go b/ratelimitreader.go
new file mode 100644 (file)
index 0000000..236efd4
--- /dev/null
@@ -0,0 +1,25 @@
+package torrent
+
+import (
+       "context"
+       "io"
+       "time"
+
+       "golang.org/x/time/rate"
+)
+
+type rateLimitedReader struct {
+       l *rate.Limiter
+       r io.Reader
+}
+
+func (me rateLimitedReader) Read(b []byte) (n int, err error) {
+       if err := me.l.WaitN(context.Background(), 1); err != nil {
+               panic(err)
+       }
+       n, err = me.r.Read(b)
+       if !me.l.ReserveN(time.Now(), n-1).OK() {
+               panic(n - 1)
+       }
+       return
+}