X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peerconn.go;h=bd6b376b508d7b9606e6b4ad5e61e354a472e7c9;hb=12279621e4e2bf5c6f230c9e8668206519e79c9e;hp=989b2f816a6398ebe9e41fa1e3718c4a676f376b;hpb=ae4eb8569b9ecaa0bb1ec766c2c7ef5bab33ec35;p=btrtrc.git diff --git a/peerconn.go b/peerconn.go index 989b2f81..bd6b376b 100644 --- a/peerconn.go +++ b/peerconn.go @@ -5,6 +5,7 @@ import ( "bytes" "errors" "fmt" + "golang.org/x/time/rate" "io" "math/rand" "net" @@ -986,10 +987,22 @@ func (c *PeerConn) reject(r Request) { delete(c.peerRequests, r) } -func (c *PeerConn) onReadRequest(r Request) error { +func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) { + uploadRateLimiter := c.t.cl.config.UploadRateLimiter + if uploadRateLimiter.Limit() == rate.Inf { + return + } + return Some(uploadRateLimiter.Burst()) +} + +// startFetch is for testing purposes currently. +func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1) if _, ok := c.peerRequests[r]; ok { torrent.Add("duplicate requests received", 1) + if c.fastEnabled() { + return errors.New("received duplicate request with fast enabled") + } return nil } if c.choking { @@ -1009,10 +1022,18 @@ func (c *PeerConn) onReadRequest(r Request) error { // BEP 6 says we may close here if we choose. return nil } + if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value { + err := fmt.Errorf("peer requested chunk too long (%v)", r.Length) + c.logger.Levelf(log.Warning, err.Error()) + if c.fastEnabled() { + c.reject(r) + return nil + } else { + return err + } + } if !c.t.havePiece(pieceIndex(r.Index)) { - // This isn't necessarily them screwing up. We can drop pieces - // from our storage, and can't communicate this to peers - // except by reconnecting. + // TODO: Tell the peer we don't have the piece, and reject this request. requestsReceivedForMissingPieces.Add(1) return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int()) } @@ -1026,7 +1047,10 @@ func (c *PeerConn) onReadRequest(r Request) error { } value := &peerRequestState{} c.peerRequests[r] = value - go c.peerRequestDataReader(r, value) + if startFetch { + // TODO: Limit peer request data read concurrency. + go c.peerRequestDataReader(r, value) + } return nil } @@ -1222,7 +1246,7 @@ func (c *PeerConn) mainReadLoop() (err error) { err = c.peerSentBitfield(msg.Bitfield) case pp.Request: r := newRequestFromMessage(&msg) - err = c.onReadRequest(r) + err = c.onReadRequest(r, true) case pp.Piece: c.doChunkReadStats(int64(len(msg.Piece))) err = c.receiveChunk(&msg)