From: Matt Joiner Date: Sat, 25 Jun 2022 13:16:58 +0000 (+1000) Subject: Check that incoming peer request chunk lengths don't exceed the upload rate limiter... X-Git-Tag: v1.46.0 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=refs%2Ftags%2Fv1.46.0;p=btrtrc.git Check that incoming peer request chunk lengths don't exceed the upload rate limiter burst size Should fix #759. --- diff --git a/client.go b/client.go index e71b6702..70d3d103 100644 --- a/client.go +++ b/client.go @@ -973,7 +973,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { return fmt.Errorf("adding connection: %w", err) } defer t.dropConnection(c) - c.startWriter() + c.startMessageWriter() cl.sendInitialMessages(c, t) c.initUpdateRequestsTimer() err := c.mainReadLoop() diff --git a/peer-conn-msg-writer.go b/peer-conn-msg-writer.go index 87ed4750..b30d34e0 100644 --- a/peer-conn-msg-writer.go +++ b/peer-conn-msg-writer.go @@ -12,7 +12,7 @@ import ( pp "github.com/anacrolix/torrent/peer_protocol" ) -func (pc *PeerConn) startWriter() { +func (pc *PeerConn) initMessageWriter() { w := &pc.messageWriter *w = peerConnMsgWriter{ fillWriteBuffer: func() { @@ -33,12 +33,18 @@ func (pc *PeerConn) startWriter() { }, writeBuffer: new(bytes.Buffer), } - go func() { - defer pc.locker().Unlock() - defer pc.close() - defer pc.locker().Lock() - pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout) - }() +} + +func (pc *PeerConn) startMessageWriter() { + pc.initMessageWriter() + go pc.messageWriterRunner() +} + +func (pc *PeerConn) messageWriterRunner() { + defer pc.locker().Unlock() + defer pc.close() + defer pc.locker().Lock() + pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout) } type peerConnMsgWriter struct { diff --git a/peerconn.go b/peerconn.go index 989b2f81..bd6b376b 100644 --- a/peerconn.go +++ b/peerconn.go @@ -5,6 +5,7 @@ import ( "bytes" "errors" "fmt" + "golang.org/x/time/rate" "io" "math/rand" "net" @@ -986,10 +987,22 @@ func (c *PeerConn) reject(r Request) { delete(c.peerRequests, r) } -func (c *PeerConn) onReadRequest(r Request) error { +func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) { + uploadRateLimiter := c.t.cl.config.UploadRateLimiter + if uploadRateLimiter.Limit() == rate.Inf { + return + } + return Some(uploadRateLimiter.Burst()) +} + +// startFetch is for testing purposes currently. +func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1) if _, ok := c.peerRequests[r]; ok { torrent.Add("duplicate requests received", 1) + if c.fastEnabled() { + return errors.New("received duplicate request with fast enabled") + } return nil } if c.choking { @@ -1009,10 +1022,18 @@ func (c *PeerConn) onReadRequest(r Request) error { // BEP 6 says we may close here if we choose. return nil } + if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value { + err := fmt.Errorf("peer requested chunk too long (%v)", r.Length) + c.logger.Levelf(log.Warning, err.Error()) + if c.fastEnabled() { + c.reject(r) + return nil + } else { + return err + } + } if !c.t.havePiece(pieceIndex(r.Index)) { - // This isn't necessarily them screwing up. We can drop pieces - // from our storage, and can't communicate this to peers - // except by reconnecting. + // TODO: Tell the peer we don't have the piece, and reject this request. requestsReceivedForMissingPieces.Add(1) return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int()) } @@ -1026,7 +1047,10 @@ func (c *PeerConn) onReadRequest(r Request) error { } value := &peerRequestState{} c.peerRequests[r] = value - go c.peerRequestDataReader(r, value) + if startFetch { + // TODO: Limit peer request data read concurrency. + go c.peerRequestDataReader(r, value) + } return nil } @@ -1222,7 +1246,7 @@ func (c *PeerConn) mainReadLoop() (err error) { err = c.peerSentBitfield(msg.Bitfield) case pp.Request: r := newRequestFromMessage(&msg) - err = c.onReadRequest(r) + err = c.onReadRequest(r, true) case pp.Piece: c.doChunkReadStats(int64(len(msg.Piece))) err = c.receiveChunk(&msg) diff --git a/peerconn_test.go b/peerconn_test.go index 1bd9712b..2f1fc56a 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "golang.org/x/time/rate" "io" "net" "sync" @@ -32,7 +33,7 @@ func TestSendBitfieldThenHave(t *testing.T) { r, w := io.Pipe() // c.r = r c.w = w - c.startWriter() + c.startMessageWriter() c.locker().Lock() c.t._completedPieces.Add(1) c.postBitfield( /*[]bool{false, true, false}*/ ) @@ -282,3 +283,32 @@ func TestPreferredNetworkDirection(t *testing.T) { // No difference c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse) } + +func TestReceiveLargeRequest(t *testing.T) { + c := qt.New(t) + cl := newTestingClient(t) + pc := cl.newConnection(nil, false, nil, "test", "") + tor := cl.newTorrentForTesting() + tor.info = &metainfo.Info{PieceLength: 3 << 20} + pc.setTorrent(tor) + tor._completedPieces.Add(0) + pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true) + pc.choking = false + pc.initMessageWriter() + req := Request{} + req.Length = defaultChunkSize + c.Assert(pc.fastEnabled(), qt.IsTrue) + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 1) + req.Length = 2 << 20 + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 2) + pc.peerRequests = nil + pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize) + req.Length = defaultChunkSize + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.peerRequests, qt.HasLen, 1) + req.Length = 2 << 20 + c.Check(pc.onReadRequest(req, false), qt.IsNil) + c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17) +}