]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn.go
Check that incoming peer request chunk lengths don't exceed the upload rate limiter...
[btrtrc.git] / peerconn.go
index 4597ea479cf6f375b4cc4b443d274b04e491c178..bd6b376b508d7b9606e6b4ad5e61e354a472e7c9 100644 (file)
@@ -5,6 +5,7 @@ import (
        "bytes"
        "errors"
        "fmt"
+       "golang.org/x/time/rate"
        "io"
        "math/rand"
        "net"
@@ -366,7 +367,7 @@ func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int))
                        count++
                } else {
                        if count != 0 {
-                               f(last.Value(), count)
+                               f(last.Value, count)
                        }
                        last = item
                        count = 1
@@ -986,10 +987,22 @@ func (c *PeerConn) reject(r Request) {
        delete(c.peerRequests, r)
 }
 
-func (c *PeerConn) onReadRequest(r Request) error {
+func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
+       uploadRateLimiter := c.t.cl.config.UploadRateLimiter
+       if uploadRateLimiter.Limit() == rate.Inf {
+               return
+       }
+       return Some(uploadRateLimiter.Burst())
+}
+
+// startFetch is for testing purposes currently.
+func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
        requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
        if _, ok := c.peerRequests[r]; ok {
                torrent.Add("duplicate requests received", 1)
+               if c.fastEnabled() {
+                       return errors.New("received duplicate request with fast enabled")
+               }
                return nil
        }
        if c.choking {
@@ -1009,10 +1022,18 @@ func (c *PeerConn) onReadRequest(r Request) error {
                // BEP 6 says we may close here if we choose.
                return nil
        }
+       if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value {
+               err := fmt.Errorf("peer requested chunk too long (%v)", r.Length)
+               c.logger.Levelf(log.Warning, err.Error())
+               if c.fastEnabled() {
+                       c.reject(r)
+                       return nil
+               } else {
+                       return err
+               }
+       }
        if !c.t.havePiece(pieceIndex(r.Index)) {
-               // This isn't necessarily them screwing up. We can drop pieces
-               // from our storage, and can't communicate this to peers
-               // except by reconnecting.
+               // TODO: Tell the peer we don't have the piece, and reject this request.
                requestsReceivedForMissingPieces.Add(1)
                return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
        }
@@ -1026,7 +1047,10 @@ func (c *PeerConn) onReadRequest(r Request) error {
        }
        value := &peerRequestState{}
        c.peerRequests[r] = value
-       go c.peerRequestDataReader(r, value)
+       if startFetch {
+               // TODO: Limit peer request data read concurrency.
+               go c.peerRequestDataReader(r, value)
+       }
        return nil
 }
 
@@ -1042,6 +1066,7 @@ func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
                }
                torrent.Add("peer request data read successes", 1)
                prs.data = b
+               // This might be required for the error case too (#752 and #753).
                c.tickleWriter()
        }
 }
@@ -1221,7 +1246,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        err = c.peerSentBitfield(msg.Bitfield)
                case pp.Request:
                        r := newRequestFromMessage(&msg)
-                       err = c.onReadRequest(r)
+                       err = c.onReadRequest(r, true)
                case pp.Piece:
                        c.doChunkReadStats(int64(len(msg.Piece)))
                        err = c.receiveChunk(&msg)
@@ -1399,8 +1424,8 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        req := c.t.requestIndexFromRequest(ppReq)
        t := c.t
 
-       if c.bannableAddr.Ok() {
-               t.smartBanCache.RecordBlock(c.bannableAddr.Value(), req, msg.Piece)
+       if c.bannableAddr.Ok {
+               t.smartBanCache.RecordBlock(c.bannableAddr.Value, req, msg.Piece)
        }
 
        if c.peerChoking {
@@ -1635,7 +1660,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
        if c.t.requestingPeer(r) != c {
                panic("only one peer should have a given request at a time")
        }
-       c.t.requestState[r] = requestState{}
+       delete(c.t.requestState, r)
        // c.t.iterPeers(func(p *Peer) {
        //      if p.isLowOnRequests() {
        //              p.updateRequests("Peer.deleteRequest")