]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Wait for cancelled requests to be rejected per the spec
authorMatt Joiner <anacrolix@gmail.com>
Tue, 12 Oct 2021 23:16:56 +0000 (10:16 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 19 Oct 2021 03:08:56 +0000 (14:08 +1100)
peer-impl.go
peerconn.go
requesting.go
webseed-peer.go

index 1dc154f705632eaa77da418aef825b962b48647a..23ced725fbcac4fa6d8c77dc9095e8baeae053f8 100644 (file)
@@ -14,7 +14,7 @@ type peerImpl interface {
 
        // Neither of these return buffer room anymore, because they're currently both posted. There's
        // also PeerConn.writeBufferFull for when/where it matters.
-       _cancel(Request) bool
+       _cancel(RequestIndex) bool
        _request(Request) bool
 
        connectionFlags() string
index 009d9a7dbf5b626bdffdb46d5c6262fdca649cd7..aa03a741c33a5770ac2f067635fcff46f28140f1 100644 (file)
@@ -84,6 +84,7 @@ type Peer struct {
        // Stuff controlled by the local peer.
        needRequestUpdate    string
        actualRequestState   requestState
+       cancelledRequests    roaring.Bitmap
        lastBecameInterested time.Time
        priorInterest        time.Duration
 
@@ -607,17 +608,28 @@ func (me *PeerConn) _request(r Request) bool {
 }
 
 func (me *Peer) cancel(r RequestIndex) bool {
-       if me.deleteRequest(r) {
+       if !me.actualRequestState.Requests.Contains(r) {
+               return true
+       }
+       return me._cancel(r)
+}
+
+func (me *PeerConn) _cancel(r RequestIndex) bool {
+       if me.cancelledRequests.Contains(r) {
+               // Already cancelled and waiting for a response.
+               return true
+       }
+       if me.fastEnabled() {
+               me.cancelledRequests.Add(r)
+       } else {
+               if !me.deleteRequest(r) {
+                       panic("request not existing should have been guarded")
+               }
                if me.actualRequestState.Requests.GetCardinality() == 0 {
                        me.updateRequests("Peer.cancel")
                }
-               return me.peerImpl._cancel(me.t.requestIndexToRequest(r))
        }
-       return true
-}
-
-func (me *PeerConn) _cancel(r Request) bool {
-       return me.write(makeCancelMessage(r))
+       return me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
@@ -1299,6 +1311,9 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                        if !c.peerChoking {
                                c._chunksReceivedWhileExpecting++
                        }
+                       if c.actualRequestState.Requests.GetCardinality() == 0 {
+                               c.updateRequests("Peer.receiveChunk deleted request")
+                       }
                } else {
                        chunksReceived.Add("unwanted", 1)
                }
@@ -1320,9 +1335,6 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
        if deletedRequest {
                c.piecesReceivedSinceLastRequestUpdate++
-               if c.actualRequestState.Requests.GetCardinality() == 0 {
-                       c.updateRequests("Peer.receiveChunk deleted request")
-               }
                c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
        }
        for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
@@ -1490,6 +1502,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
        if !c.actualRequestState.Requests.CheckedRemove(r) {
                return false
        }
+       c.cancelledRequests.Remove(r)
        for _, f := range c.callbacks.DeletedRequest {
                f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
        }
index c1575885d281605823bceba42eb584578b62b396..0ec4151b0c3e414fc1ab6bce3762eeb8d5f70745 100644 (file)
@@ -11,7 +11,6 @@ import (
 
        "github.com/RoaringBitmap/roaring"
        "github.com/anacrolix/log"
-       "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
 
        request_strategy "github.com/anacrolix/torrent/request-strategy"
@@ -268,18 +267,23 @@ func (p *Peer) applyRequestState(next requestState) bool {
                return false
        }
        next.Requests.Iterate(func(req uint32) bool {
-               // This could happen if the peer chokes us between the next state being generated, and us
-               // trying to transmit the state.
-               if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
-                       return true
+               if p.cancelledRequests.Contains(req) {
+                       log.Printf("waiting for cancelled request %v", req)
+                       return false
+               }
+               if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
+                       log.Printf("not assigning all requests [desired=%v, cancelled=%v, max=%v]",
+                               current.Requests.GetCardinality(),
+                               p.cancelledRequests.GetCardinality(),
+                               p.nominalMaxRequests(),
+                       )
+                       return false
                }
                var err error
                more, err = p.request(req)
                if err != nil {
                        panic(err)
-               } /* else {
-                       log.Print(req)
-               } */
+               }
                return more
        })
        if more {
index ebdd8dba9ef562a50e2a59b591d596916a9860c8..d2c466816b4c405d568a61991450a75e060ae206 100644 (file)
@@ -45,10 +45,16 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
        return true
 }
 
-func (ws *webseedPeer) _cancel(r Request) bool {
-       active, ok := ws.activeRequests[r]
+func (ws *webseedPeer) _cancel(r RequestIndex) bool {
+       active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
        if ok {
                active.Cancel()
+               if !ws.peer.deleteRequest(r) {
+                       panic("cancelled webseed request should exist")
+               }
+               if ws.peer.actualRequestState.Requests.GetCardinality() == 0 {
+                       ws.peer.updateRequests("webseedPeer._cancel")
+               }
        }
        return true
 }