]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Drop connections that send chunks we shouldn't receive
authorMatt Joiner <anacrolix@gmail.com>
Sun, 24 Jun 2018 10:35:46 +0000 (20:35 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 24 Jun 2018 10:35:46 +0000 (20:35 +1000)
connection.go

index 85aaabdcf019fce9698ab7096ef2bf23d87a1dcb..a019ff2062f14b4b95b13d026f7ec0dfd614c673 100644 (file)
@@ -76,6 +76,10 @@ type connection struct {
        Choked           bool
        requests         map[request]struct{}
        requestsLowWater int
+       // Chunks that we might reasonably expect to receive from the peer. Due to
+       // latency, buffering, and implementation differences, we may receive
+       // chunks that are no longer in the set of requests actually want.
+       validReceiveChunks map[request]struct{}
        // Indexed by metadata piece, set to true if posted and pending a
        // response.
        metadataRequests []bool
@@ -475,9 +479,6 @@ type messageWriter func(pp.Message) bool
 
 // Proxies the messageWriter's response.
 func (cn *connection) request(r request, mw messageWriter) bool {
-       if cn.requests == nil {
-               cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
-       }
        if _, ok := cn.requests[r]; ok {
                panic("chunk already requested")
        }
@@ -497,7 +498,14 @@ func (cn *connection) request(r request, mw messageWriter) bool {
                        panic("requesting while choked and not allowed fast")
                }
        }
+       if cn.requests == nil {
+               cn.requests = make(map[request]struct{})
+       }
        cn.requests[r] = struct{}{}
+       if cn.validReceiveChunks == nil {
+               cn.validReceiveChunks = make(map[request]struct{})
+       }
+       cn.validReceiveChunks[r] = struct{}{}
        cn.t.pendingRequests[r]++
        cn.t.lastRequested[r] = time.Now()
        cn.updateExpectingChunks()
@@ -1099,6 +1107,7 @@ func (c *connection) mainReadLoop() (err error) {
                        c.updateExpectingChunks()
                case pp.Reject:
                        c.deleteRequest(newRequestFromMessage(&msg))
+                       delete(c.validReceiveChunks, newRequestFromMessage(&msg))
                case pp.Unchoke:
                        c.PeerChoked = false
                        c.tickleWriter()
@@ -1126,7 +1135,7 @@ func (c *connection) mainReadLoop() (err error) {
                case pp.HaveNone:
                        err = c.peerSentHaveNone()
                case pp.Piece:
-                       c.receiveChunk(&msg)
+                       err = c.receiveChunk(&msg)
                        if len(msg.Piece) == int(t.chunkSize) {
                                t.chunkPool.Put(&msg.Piece)
                        }
@@ -1271,13 +1280,18 @@ func (cn *connection) rw() io.ReadWriter {
 }
 
 // Handle a received chunk from a peer.
-func (c *connection) receiveChunk(msg *pp.Message) {
+func (c *connection) receiveChunk(msg *pp.Message) error {
        t := c.t
        cl := t.cl
        torrent.Add("chunks received", 1)
 
        req := newRequestFromMessage(msg)
 
+       if _, ok := c.validReceiveChunks[req]; !ok {
+               return errors.New("received unexpected chunk")
+       }
+       delete(c.validReceiveChunks, req)
+
        // Request has been satisfied.
        if c.deleteRequest(req) {
                if c.expectingChunks() {
@@ -1299,7 +1313,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        if t.haveChunk(req) {
                torrent.Add("chunks received unwanted", 1)
                c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted }))
-               return
+               return nil
        }
 
        index := int(req.Index)
@@ -1343,7 +1357,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
                log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
                t.pendRequest(req)
                t.updatePieceCompletion(int(msg.Index))
-               return
+               return nil
        }
 
        // It's important that the piece is potentially queued before we check if
@@ -1357,6 +1371,8 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 
        cl.event.Broadcast()
        t.publishPieceChange(int(req.Index))
+
+       return nil
 }
 
 func (c *connection) onDirtiedPiece(piece int) {