]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Don't automatically delete requests if we're choked with fast extension
authorMatt Joiner <anacrolix@gmail.com>
Mon, 11 Oct 2021 07:21:24 +0000 (18:21 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 19 Oct 2021 03:08:56 +0000 (14:08 +1100)
peerconn.go
requesting.go

index c54be39faa51aea43335678606fb11986fd2b7f6..a49b6a05cf8643975df0584061573d8d00b0be2a 100644 (file)
@@ -1035,12 +1035,28 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        c.peerChoking = true
                        if !c.fastEnabled() {
                                c.deleteAllRequests()
+                       } else {
+                               c.actualRequestState.Requests.Iterate(func(x uint32) bool {
+                                       if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
+                                               c.t.pendingRequests.Dec(x)
+                                       }
+                                       return true
+                               })
                        }
                        // We can then reset our interest.
                        c.updateRequests("choked")
                        c.updateExpectingChunks()
                case pp.Unchoke:
+                       if !c.peerChoking {
+                               return errors.New("got unchoke but not choked")
+                       }
                        c.peerChoking = false
+                       c.actualRequestState.Requests.Iterate(func(x uint32) bool {
+                               if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
+                                       c.t.pendingRequests.Inc(x)
+                               }
+                               return true
+                       })
                        c.updateRequests("unchoked")
                        c.updateExpectingChunks()
                case pp.Interested:
@@ -1098,7 +1114,15 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
                        log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
-                       c.peerAllowedFast.Add(bitmap.BitIndex(msg.Index))
+                       pieceIndex := msg.Index.Int()
+                       c.peerAllowedFast.AddInt(pieceIndex)
+                       n := roaringBitmapRangeCardinality(
+                               &c.actualRequestState.Requests,
+                               t.pieceRequestIndexOffset(pieceIndex),
+                               t.pieceRequestIndexOffset(pieceIndex+1))
+                       if n != 0 {
+                               panic(n)
+                       }
                        c.updateRequests("allowed fast")
                case pp.Extended:
                        err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
@@ -1444,7 +1468,9 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
                f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
        }
        c.updateExpectingChunks()
-       c.t.pendingRequests.Dec(r)
+       if !c.peerChoking || c.peerAllowedFast.Contains(r/c.t.chunksPerRegularPiece()) {
+               c.t.pendingRequests.Dec(r)
+       }
        return true
 }
 
index 737771fec97a64ee0dda497b832c0a05c579ada0..027365e381534621eac280b1048633fc4ca49184 100644 (file)
@@ -147,13 +147,22 @@ func (p peerRequests) Less(i, j int) bool {
                return ret
        }
        ml := multiless.New()
+       // Push requests that can't be served right now to the end. But we don't throw them away unless
+       // there's a better alternative. This is for when we're using the fast extension and get choked
+       // but our requests could still be good when we get unchoked.
+       if p.peer.peerChoking {
+               ml = ml.Bool(
+                       !p.peer.peerAllowedFast.Contains(leftPieceIndex),
+                       !p.peer.peerAllowedFast.Contains(rightPieceIndex),
+               )
+       }
        ml = ml.Int(
                pending(leftRequest, leftCurrent),
                pending(rightRequest, rightCurrent))
        ml = ml.Bool(rightCurrent, leftCurrent)
        ml = ml.Int(
-               int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
-               int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority))
+               int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
+               int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority))
        ml = ml.Int(
                int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
                int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
@@ -198,15 +207,15 @@ func (p *Peer) getDesiredRequestState() (desired requestState) {
                                return
                        }
                        allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
-                       if !allowedFast {
-                               // We must signal interest to request this piece.
-                               desired.Interested = true
-                               if p.peerChoking {
-                                       // We can't request from this piece right now then.
-                                       return
-                               }
-                       }
                        rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
+                               if !allowedFast {
+                                       // We must signal interest to request this..
+                                       desired.Interested = true
+                                       if p.peerChoking && !p.actualRequestState.Requests.Contains(ci) {
+                                               // We can't request this right now.
+                                               return
+                                       }
+                               }
                                requestHeap.requestIndexes = append(
                                        requestHeap.requestIndexes,
                                        p.t.pieceRequestIndexOffset(pieceIndex)+ci)