This can be racy. In the TestReceiveChunkStorageFailure, when we have a storage write failure, we request the chunk again, but the peer has sometimes already sent it, and we return from the connection read loop with unexpected chunk after receiving it twice.
// Chunks that we might reasonably expect to receive from the peer. Due to
// latency, buffering, and implementation differences, we may receive
// chunks that are no longer in the set of requests actually want.
// Chunks that we might reasonably expect to receive from the peer. Due to
// latency, buffering, and implementation differences, we may receive
// chunks that are no longer in the set of requests actually want.
- validReceiveChunks map[request]struct{}
+ validReceiveChunks map[request]int
// Indexed by metadata piece, set to true if posted and pending a
// response.
metadataRequests []bool
// Indexed by metadata piece, set to true if posted and pending a
// response.
metadataRequests []bool
}
cn.requests[r] = struct{}{}
if cn.validReceiveChunks == nil {
}
cn.requests[r] = struct{}{}
if cn.validReceiveChunks == nil {
- cn.validReceiveChunks = make(map[request]struct{})
+ cn.validReceiveChunks = make(map[request]int)
- cn.validReceiveChunks[r] = struct{}{}
+ cn.validReceiveChunks[r]++
cn.t.pendingRequests[r]++
cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
cn.t.pendingRequests[r]++
cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
+func runSafeExtraneous(f func()) {
+ if true {
+ go f()
+ } else {
+ f()
+ }
+}
+
// Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and
// exit. Returning will end the connection.
func (c *PeerConn) mainReadLoop() (err error) {
// Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and
// exit. Returning will end the connection.
func (c *PeerConn) mainReadLoop() (err error) {
}
messageTypesReceived.Add(msg.Type.String(), 1)
if msg.Type.FastExtension() && !c.fastEnabled() {
}
messageTypesReceived.Add(msg.Type.String(), 1)
if msg.Type.FastExtension() && !c.fastEnabled() {
+ runSafeExtraneous(func() { torrent.Add("fast messages received when extension is disabled", 1) })
return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
}
switch msg.Type {
return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
}
switch msg.Type {
err = c.peerSentHaveNone()
case pp.Reject:
c.deleteRequest(newRequestFromMessage(&msg))
err = c.peerSentHaveNone()
case pp.Reject:
c.deleteRequest(newRequestFromMessage(&msg))
- delete(c.validReceiveChunks, newRequestFromMessage(&msg))
+ c.decExpectedChunkReceive(newRequestFromMessage(&msg))
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
+func (c *PeerConn) decExpectedChunkReceive(r request) {
+ count := c.validReceiveChunks[r]
+ if count == 1 {
+ delete(c.validReceiveChunks, r)
+ } else if count > 1 {
+ c.validReceiveChunks[r] = count - 1
+ } else {
+ panic(r)
+ }
+}
+
func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
defer func() {
// TODO: Should we still do this?
func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
defer func() {
// TODO: Should we still do this?
torrent.Add("chunks received while choking", 1)
}
torrent.Add("chunks received while choking", 1)
}
- if _, ok := c.validReceiveChunks[req]; !ok {
+ if c.validReceiveChunks[req] <= 0 {
torrent.Add("chunks received unexpected", 1)
return errors.New("received unexpected chunk")
}
torrent.Add("chunks received unexpected", 1)
return errors.New("received unexpected chunk")
}
- delete(c.validReceiveChunks, req)
+ c.decExpectedChunkReceive(req)
if c.peerChoking && c.peerAllowedFast.Get(int(req.Index)) {
torrent.Add("chunks received due to allowed fast", 1)
if c.peerChoking && c.peerAllowedFast.Get(int(req.Index)) {
torrent.Add("chunks received due to allowed fast", 1)
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
t.pieces[0]._dirtyChunks.Clear()
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
t.pieces[0]._dirtyChunks.Clear()
- cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): {}}
+ cn.validReceiveChunks = map[request]int{newRequestFromMessage(&msg): 1}
cl.unlock()
n, err := w.Write(wb)
require.NoError(b, err)
cl.unlock()
n, err := w.Write(wb)
require.NoError(b, err)