Choked bool
requests map[request]struct{}
requestsLowWater int
+ // Chunks that we might reasonably expect to receive from the peer. Due to
+ // latency, buffering, and implementation differences, we may receive
+ // chunks that are no longer in the set of requests actually want.
+ validReceiveChunks map[request]struct{}
// Indexed by metadata piece, set to true if posted and pending a
// response.
metadataRequests []bool
// Proxies the messageWriter's response.
func (cn *connection) request(r request, mw messageWriter) bool {
- if cn.requests == nil {
- cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
- }
if _, ok := cn.requests[r]; ok {
panic("chunk already requested")
}
panic("requesting while choked and not allowed fast")
}
}
+ if cn.requests == nil {
+ cn.requests = make(map[request]struct{})
+ }
cn.requests[r] = struct{}{}
+ if cn.validReceiveChunks == nil {
+ cn.validReceiveChunks = make(map[request]struct{})
+ }
+ cn.validReceiveChunks[r] = struct{}{}
cn.t.pendingRequests[r]++
cn.t.lastRequested[r] = time.Now()
cn.updateExpectingChunks()
c.updateExpectingChunks()
case pp.Reject:
c.deleteRequest(newRequestFromMessage(&msg))
+ delete(c.validReceiveChunks, newRequestFromMessage(&msg))
case pp.Unchoke:
c.PeerChoked = false
c.tickleWriter()
case pp.HaveNone:
err = c.peerSentHaveNone()
case pp.Piece:
- c.receiveChunk(&msg)
+ err = c.receiveChunk(&msg)
if len(msg.Piece) == int(t.chunkSize) {
t.chunkPool.Put(&msg.Piece)
}
}
// Handle a received chunk from a peer.
-func (c *connection) receiveChunk(msg *pp.Message) {
+func (c *connection) receiveChunk(msg *pp.Message) error {
t := c.t
cl := t.cl
torrent.Add("chunks received", 1)
req := newRequestFromMessage(msg)
+ if _, ok := c.validReceiveChunks[req]; !ok {
+ return errors.New("received unexpected chunk")
+ }
+ delete(c.validReceiveChunks, req)
+
// Request has been satisfied.
if c.deleteRequest(req) {
if c.expectingChunks() {
if t.haveChunk(req) {
torrent.Add("chunks received unwanted", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted }))
- return
+ return nil
}
index := int(req.Index)
log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
t.pendRequest(req)
t.updatePieceCompletion(int(msg.Index))
- return
+ return nil
}
// It's important that the piece is potentially queued before we check if
cl.event.Broadcast()
t.publishPieceChange(int(req.Index))
+
+ return nil
}
func (c *connection) onDirtiedPiece(piece int) {