]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Count expected received chunks instead of flagging them
authorMatt Joiner <anacrolix@gmail.com>
Thu, 23 Apr 2020 02:34:43 +0000 (12:34 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 23 Apr 2020 02:34:43 +0000 (12:34 +1000)
This can be racy. In the TestReceiveChunkStorageFailure, when we have a storage write failure, we request the chunk again, but the peer has sometimes already sent it, and we return from the connection read loop with unexpected chunk after receiving it twice.

peerconn.go
peerconn_test.go

index a2652da9365c2b4419f4abf7ca235446fc95c5b5..8933dc77036fa1777ba716e6cef63df1c63ee468 100644 (file)
@@ -81,7 +81,7 @@ type PeerConn struct {
        // Chunks that we might reasonably expect to receive from the peer. Due to
        // latency, buffering, and implementation differences, we may receive
        // chunks that are no longer in the set of requests actually want.
-       validReceiveChunks map[request]struct{}
+       validReceiveChunks map[request]int
        // Indexed by metadata piece, set to true if posted and pending a
        // response.
        metadataRequests []bool
@@ -501,9 +501,9 @@ func (cn *PeerConn) request(r request, mw messageWriter) bool {
        }
        cn.requests[r] = struct{}{}
        if cn.validReceiveChunks == nil {
-               cn.validReceiveChunks = make(map[request]struct{})
+               cn.validReceiveChunks = make(map[request]int)
        }
-       cn.validReceiveChunks[r] = struct{}{}
+       cn.validReceiveChunks[r]++
        cn.t.pendingRequests[r]++
        cn.t.requestStrategy.hooks().sentRequest(r)
        cn.updateExpectingChunks()
@@ -987,6 +987,14 @@ func (c *PeerConn) onReadRequest(r request) error {
        return nil
 }
 
+func runSafeExtraneous(f func()) {
+       if true {
+               go f()
+       } else {
+               f()
+       }
+}
+
 // Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and
 // exit. Returning will end the connection.
 func (c *PeerConn) mainReadLoop() (err error) {
@@ -1026,6 +1034,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                }
                messageTypesReceived.Add(msg.Type.String(), 1)
                if msg.Type.FastExtension() && !c.fastEnabled() {
+                       runSafeExtraneous(func() { torrent.Add("fast messages received when extension is disabled", 1) })
                        return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
                }
                switch msg.Type {
@@ -1090,7 +1099,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        err = c.peerSentHaveNone()
                case pp.Reject:
                        c.deleteRequest(newRequestFromMessage(&msg))
-                       delete(c.validReceiveChunks, newRequestFromMessage(&msg))
+                       c.decExpectedChunkReceive(newRequestFromMessage(&msg))
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
                        log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
@@ -1107,6 +1116,17 @@ func (c *PeerConn) mainReadLoop() (err error) {
        }
 }
 
+func (c *PeerConn) decExpectedChunkReceive(r request) {
+       count := c.validReceiveChunks[r]
+       if count == 1 {
+               delete(c.validReceiveChunks, r)
+       } else if count > 1 {
+               c.validReceiveChunks[r] = count - 1
+       } else {
+               panic(r)
+       }
+}
+
 func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
        defer func() {
                // TODO: Should we still do this?
@@ -1196,11 +1216,11 @@ func (c *PeerConn) receiveChunk(msg *pp.Message) error {
                torrent.Add("chunks received while choking", 1)
        }
 
-       if _, ok := c.validReceiveChunks[req]; !ok {
+       if c.validReceiveChunks[req] <= 0 {
                torrent.Add("chunks received unexpected", 1)
                return errors.New("received unexpected chunk")
        }
-       delete(c.validReceiveChunks, req)
+       c.decExpectedChunkReceive(req)
 
        if c.peerChoking && c.peerAllowedFast.Get(int(req.Index)) {
                torrent.Add("chunks received due to allowed fast", 1)
index 439f533ab9801e2778a752b031ccd09be5ec2728..61d740547097363853209a8b2d618582b89a58c2 100644 (file)
@@ -132,7 +132,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
                        // The chunk must be written to storage everytime, to ensure the
                        // writeSem is unlocked.
                        t.pieces[0]._dirtyChunks.Clear()
-                       cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): {}}
+                       cn.validReceiveChunks = map[request]int{newRequestFromMessage(&msg): 1}
                        cl.unlock()
                        n, err := w.Write(wb)
                        require.NoError(b, err)