]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Check that incoming peer request chunk lengths don't exceed the upload rate limiter... v1.46.0
authorMatt Joiner <anacrolix@gmail.com>
Sat, 25 Jun 2022 13:16:58 +0000 (23:16 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 25 Jun 2022 13:16:58 +0000 (23:16 +1000)
Should fix #759.

client.go
peer-conn-msg-writer.go
peerconn.go
peerconn_test.go

index e71b67029fbeeba112289f1aaf718489491c8ee4..70d3d103d19170874dd8e9368731ea1cb31548ef 100644 (file)
--- a/client.go
+++ b/client.go
@@ -973,7 +973,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
                return fmt.Errorf("adding connection: %w", err)
        }
        defer t.dropConnection(c)
-       c.startWriter()
+       c.startMessageWriter()
        cl.sendInitialMessages(c, t)
        c.initUpdateRequestsTimer()
        err := c.mainReadLoop()
index 87ed47505f952684340083b5726d577e4bfc1932..b30d34e07eac3f6501a0bd1126108c99c5c95d57 100644 (file)
@@ -12,7 +12,7 @@ import (
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
-func (pc *PeerConn) startWriter() {
+func (pc *PeerConn) initMessageWriter() {
        w := &pc.messageWriter
        *w = peerConnMsgWriter{
                fillWriteBuffer: func() {
@@ -33,12 +33,18 @@ func (pc *PeerConn) startWriter() {
                },
                writeBuffer: new(bytes.Buffer),
        }
-       go func() {
-               defer pc.locker().Unlock()
-               defer pc.close()
-               defer pc.locker().Lock()
-               pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
-       }()
+}
+
+func (pc *PeerConn) startMessageWriter() {
+       pc.initMessageWriter()
+       go pc.messageWriterRunner()
+}
+
+func (pc *PeerConn) messageWriterRunner() {
+       defer pc.locker().Unlock()
+       defer pc.close()
+       defer pc.locker().Lock()
+       pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
 }
 
 type peerConnMsgWriter struct {
index 989b2f816a6398ebe9e41fa1e3718c4a676f376b..bd6b376b508d7b9606e6b4ad5e61e354a472e7c9 100644 (file)
@@ -5,6 +5,7 @@ import (
        "bytes"
        "errors"
        "fmt"
+       "golang.org/x/time/rate"
        "io"
        "math/rand"
        "net"
@@ -986,10 +987,22 @@ func (c *PeerConn) reject(r Request) {
        delete(c.peerRequests, r)
 }
 
-func (c *PeerConn) onReadRequest(r Request) error {
+func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
+       uploadRateLimiter := c.t.cl.config.UploadRateLimiter
+       if uploadRateLimiter.Limit() == rate.Inf {
+               return
+       }
+       return Some(uploadRateLimiter.Burst())
+}
+
+// startFetch is for testing purposes currently.
+func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
        requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
        if _, ok := c.peerRequests[r]; ok {
                torrent.Add("duplicate requests received", 1)
+               if c.fastEnabled() {
+                       return errors.New("received duplicate request with fast enabled")
+               }
                return nil
        }
        if c.choking {
@@ -1009,10 +1022,18 @@ func (c *PeerConn) onReadRequest(r Request) error {
                // BEP 6 says we may close here if we choose.
                return nil
        }
+       if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value {
+               err := fmt.Errorf("peer requested chunk too long (%v)", r.Length)
+               c.logger.Levelf(log.Warning, err.Error())
+               if c.fastEnabled() {
+                       c.reject(r)
+                       return nil
+               } else {
+                       return err
+               }
+       }
        if !c.t.havePiece(pieceIndex(r.Index)) {
-               // This isn't necessarily them screwing up. We can drop pieces
-               // from our storage, and can't communicate this to peers
-               // except by reconnecting.
+               // TODO: Tell the peer we don't have the piece, and reject this request.
                requestsReceivedForMissingPieces.Add(1)
                return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
        }
@@ -1026,7 +1047,10 @@ func (c *PeerConn) onReadRequest(r Request) error {
        }
        value := &peerRequestState{}
        c.peerRequests[r] = value
-       go c.peerRequestDataReader(r, value)
+       if startFetch {
+               // TODO: Limit peer request data read concurrency.
+               go c.peerRequestDataReader(r, value)
+       }
        return nil
 }
 
@@ -1222,7 +1246,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        err = c.peerSentBitfield(msg.Bitfield)
                case pp.Request:
                        r := newRequestFromMessage(&msg)
-                       err = c.onReadRequest(r)
+                       err = c.onReadRequest(r, true)
                case pp.Piece:
                        c.doChunkReadStats(int64(len(msg.Piece)))
                        err = c.receiveChunk(&msg)
index 1bd9712b86ff8d04c28ee2d9eea340f3a7714ce0..2f1fc56aa3bf888866690323df0d4799df9db79e 100644 (file)
@@ -4,6 +4,7 @@ import (
        "encoding/binary"
        "errors"
        "fmt"
+       "golang.org/x/time/rate"
        "io"
        "net"
        "sync"
@@ -32,7 +33,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
        r, w := io.Pipe()
        // c.r = r
        c.w = w
-       c.startWriter()
+       c.startMessageWriter()
        c.locker().Lock()
        c.t._completedPieces.Add(1)
        c.postBitfield( /*[]bool{false, true, false}*/ )
@@ -282,3 +283,32 @@ func TestPreferredNetworkDirection(t *testing.T) {
        // No difference
        c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
 }
+
+func TestReceiveLargeRequest(t *testing.T) {
+       c := qt.New(t)
+       cl := newTestingClient(t)
+       pc := cl.newConnection(nil, false, nil, "test", "")
+       tor := cl.newTorrentForTesting()
+       tor.info = &metainfo.Info{PieceLength: 3 << 20}
+       pc.setTorrent(tor)
+       tor._completedPieces.Add(0)
+       pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
+       pc.choking = false
+       pc.initMessageWriter()
+       req := Request{}
+       req.Length = defaultChunkSize
+       c.Assert(pc.fastEnabled(), qt.IsTrue)
+       c.Check(pc.onReadRequest(req, false), qt.IsNil)
+       c.Check(pc.peerRequests, qt.HasLen, 1)
+       req.Length = 2 << 20
+       c.Check(pc.onReadRequest(req, false), qt.IsNil)
+       c.Check(pc.peerRequests, qt.HasLen, 2)
+       pc.peerRequests = nil
+       pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
+       req.Length = defaultChunkSize
+       c.Check(pc.onReadRequest(req, false), qt.IsNil)
+       c.Check(pc.peerRequests, qt.HasLen, 1)
+       req.Length = 2 << 20
+       c.Check(pc.onReadRequest(req, false), qt.IsNil)
+       c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
+}