]> Sergey Matveev's repositories - btrtrc.git/commitdiff
More consistent cancellation management
authorMatt Joiner <anacrolix@gmail.com>
Sat, 11 Dec 2021 13:04:06 +0000 (00:04 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 12 Dec 2021 07:35:02 +0000 (18:35 +1100)
peer-impl.go
peerconn.go
request-strategy/peer.go
requesting.go
torrent.go
webseed-peer.go

index d400d9a4c9790b9b35d72234bf19bf0e5c7adfd2..47b4345ae628f629003fde3da6848090edaccf23 100644 (file)
@@ -15,9 +15,9 @@ type peerImpl interface {
        isLowOnRequests() bool
        writeInterested(interested bool) bool
 
-       // Neither of these return buffer room anymore, because they're currently both posted. There's
-       // also PeerConn.writeBufferFull for when/where it matters.
-       _cancel(RequestIndex)
+       // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
+       // handled by a follow-up event.
+       _cancel(RequestIndex) (acked bool)
        _request(Request) bool
        connectionFlags() string
        onClose()
index f36da1c03f898d9cfd4176e48d0a44322783bc5d..098d93fc46a6ba8f59833eaa9dfbce07200ae6e9 100644 (file)
@@ -52,7 +52,7 @@ type PeerRemoteAddr interface {
 // indexable with the memory space available.
 type (
        maxRequests  = int
-       requestState = request_strategy.PeerNextRequestState
+       requestState = request_strategy.PeerRequestState
 )
 
 type Peer struct {
@@ -84,9 +84,8 @@ type Peer struct {
 
        // Stuff controlled by the local peer.
        needRequestUpdate    string
-       actualRequestState   requestState
+       requestState         requestState
        updateRequestsTimer  *time.Timer
-       cancelledRequests    roaring.Bitmap
        lastBecameInterested time.Time
        priorInterest        time.Duration
 
@@ -97,9 +96,9 @@ type Peer struct {
        choking                                bool
        piecesReceivedSinceLastRequestUpdate   maxRequests
        maxPiecesReceivedBetweenRequestUpdates maxRequests
-       // Chunks that we might reasonably expect to receive from the peer. Due to
-       // latency, buffering, and implementation differences, we may receive
-       // chunks that are no longer in the set of requests actually want.
+       // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
+       // and implementation differences, we may receive chunks that are no longer in the set of
+       // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
        validReceiveChunks map[RequestIndex]int
        // Indexed by metadata piece, set to true if posted and pending a
        // response.
@@ -177,10 +176,10 @@ func (cn *Peer) updateExpectingChunks() {
 }
 
 func (cn *Peer) expectingChunks() bool {
-       if cn.actualRequestState.Requests.IsEmpty() {
+       if cn.requestState.Requests.IsEmpty() {
                return false
        }
-       if !cn.actualRequestState.Interested {
+       if !cn.requestState.Interested {
                return false
        }
        if !cn.peerChoking {
@@ -189,7 +188,7 @@ func (cn *Peer) expectingChunks() bool {
        haveAllowedFastRequests := false
        cn.peerAllowedFast.Iterate(func(i uint32) bool {
                haveAllowedFastRequests = roaringBitmapRangeCardinality(
-                       &cn.actualRequestState.Requests,
+                       &cn.requestState.Requests,
                        cn.t.pieceRequestIndexOffset(pieceIndex(i)),
                        cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
                ) == 0
@@ -230,7 +229,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) {
 
 func (cn *Peer) cumInterest() time.Duration {
        ret := cn.priorInterest
-       if cn.actualRequestState.Interested {
+       if cn.requestState.Interested {
                ret += time.Since(cn.lastBecameInterested)
        }
        return ret
@@ -318,7 +317,7 @@ func (cn *Peer) statusFlags() (ret string) {
        c := func(b byte) {
                ret += string([]byte{b})
        }
-       if cn.actualRequestState.Interested {
+       if cn.requestState.Interested {
                c('i')
        }
        if cn.choking {
@@ -346,7 +345,7 @@ func (cn *Peer) downloadRate() float64 {
 
 func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
        ret = make(map[pieceIndex]int)
-       cn.actualRequestState.Requests.Iterate(func(x uint32) bool {
+       cn.requestState.Requests.Iterate(func(x uint32) bool {
                ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
                return true
        })
@@ -373,14 +372,14 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.totalExpectingTime(),
        )
        fmt.Fprintf(w,
-               "    %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d-%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+               "    %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
                &cn._stats.ChunksReadUseful,
                &cn._stats.ChunksRead,
                &cn._stats.ChunksWritten,
-               cn.actualRequestState.Requests.GetCardinality(),
-               cn.cancelledRequests.GetCardinality(),
+               cn.requestState.Requests.GetCardinality(),
+               cn.requestState.Cancelled.GetCardinality(),
                cn.nominalMaxRequests(),
                cn.PeerMaxRequests,
                len(cn.peerRequests),
@@ -537,10 +536,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
 }
 
 func (cn *Peer) setInterested(interested bool) bool {
-       if cn.actualRequestState.Interested == interested {
+       if cn.requestState.Interested == interested {
                return true
        }
-       cn.actualRequestState.Interested = interested
+       cn.requestState.Interested = interested
        if interested {
                cn.lastBecameInterested = time.Now()
        } else if !cn.lastBecameInterested.IsZero() {
@@ -589,7 +588,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error {
        if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
                // This could occur if we made a request with the fast extension, and then got choked and
                // haven't had the request rejected yet.
-               if !cn.actualRequestState.Requests.Contains(r) {
+               if !cn.requestState.Requests.Contains(r) {
                        panic("peer choking and piece not allowed fast")
                }
        }
@@ -608,13 +607,13 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
        if err := cn.shouldRequest(r); err != nil {
                panic(err)
        }
-       if cn.actualRequestState.Requests.Contains(r) {
+       if cn.requestState.Requests.Contains(r) {
                return true, nil
        }
-       if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
+       if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
                return true, errors.New("too many outstanding requests")
        }
-       cn.actualRequestState.Requests.Add(r)
+       cn.requestState.Requests.Add(r)
        if cn.validReceiveChunks == nil {
                cn.validReceiveChunks = make(map[RequestIndex]int)
        }
@@ -639,30 +638,24 @@ func (me *PeerConn) _request(r Request) bool {
 }
 
 func (me *Peer) cancel(r RequestIndex) {
-       if !me.actualRequestState.Requests.Contains(r) {
-               panic(r)
+       if !me.deleteRequest(r) {
+               panic("request not existing should have been guarded")
+       }
+       if me._cancel(r) {
+               if !me.requestState.Cancelled.CheckedAdd(r) {
+                       panic("request already cancelled")
+               }
+       }
+       if me.isLowOnRequests() {
+               me.updateRequests("Peer.cancel")
        }
-       me._cancel(r)
 }
 
-func (me *PeerConn) _cancel(r RequestIndex) {
-       if me.cancelledRequests.Contains(r) {
-               // Already cancelled and waiting for a response.
-               panic(r)
-       }
+func (me *PeerConn) _cancel(r RequestIndex) bool {
+       me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
        // Transmission does not send rejects for received cancels. See
        // https://github.com/transmission/transmission/pull/2275.
-       if me.fastEnabled() && !me.remoteIsTransmission() {
-               me.cancelledRequests.Add(r)
-       } else {
-               if !me.deleteRequest(r) {
-                       panic("request not existing should have been guarded")
-               }
-               if me.isLowOnRequests() {
-                       me.updateRequests("Peer.cancel")
-               }
-       }
-       me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
+       return me.fastEnabled() && !me.remoteIsTransmission()
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
@@ -1102,18 +1095,13 @@ func (c *PeerConn) mainReadLoop() (err error) {
                                c.deleteAllRequests()
                        } else {
                                // We don't decrement pending requests here, let's wait for the peer to either
-                               // reject or satisfy the outstanding requests. Additionally some peers may unchoke
+                               // reject or satisfy the outstanding requests. Additionally, some peers may unchoke
                                // us and resume where they left off, we don't want to have piled on to those chunks
-                               // in the meanwhile. I think a peers ability to abuse this should be limited: they
+                               // in the meanwhile. I think a peer's ability to abuse this should be limited: they
                                // could let us request a lot of stuff, then choke us and never reject, but they're
                                // only a single peer, our chunk balancing should smooth over this abuse.
                        }
                        c.peerChoking = true
-                       // We can now reset our interest. I think we do this after setting the flag in case the
-                       // peerImpl updates synchronously (webseeds?).
-                       if !c.actualRequestState.Requests.IsEmpty() {
-                               c.updateRequests("choked")
-                       }
                        c.updateExpectingChunks()
                case pp.Unchoke:
                        if !c.peerChoking {
@@ -1124,7 +1112,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        }
                        c.peerChoking = false
                        preservedCount := 0
-                       c.actualRequestState.Requests.Iterate(func(x uint32) bool {
+                       c.requestState.Requests.Iterate(func(x uint32) bool {
                                if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
                                        preservedCount++
                                }
@@ -1194,7 +1182,11 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.HaveNone:
                        err = c.peerSentHaveNone()
                case pp.Reject:
-                       c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg)))
+                       req := newRequestFromMessage(&msg)
+                       if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
+                               log.Printf("received invalid reject [request=%v, peer=%v]", req, c)
+                               err = fmt.Errorf("received invalid reject [request=%v]", req)
+                       }
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
                        log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
@@ -1210,13 +1202,16 @@ func (c *PeerConn) mainReadLoop() (err error) {
        }
 }
 
-func (c *Peer) remoteRejectedRequest(r RequestIndex) {
-       if c.deleteRequest(r) {
-               if c.isLowOnRequests() {
-                       c.updateRequests("Peer.remoteRejectedRequest")
-               }
-               c.decExpectedChunkReceive(r)
+// Returns true if it was valid to reject the request.
+func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
+       if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) {
+               return false
+       }
+       if c.isLowOnRequests() {
+               c.updateRequests("Peer.remoteRejectedRequest")
        }
+       c.decExpectedChunkReceive(r)
+       return true
 }
 
 func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
@@ -1342,16 +1337,16 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        // The request needs to be deleted immediately to prevent cancels occurring asynchronously when
        // have actually already received the piece, while we have the Client unlocked to write the data
        // out.
-       deletedRequest := false
+       intended := false
        {
-               if c.actualRequestState.Requests.Contains(req) {
+               if c.requestState.Requests.Contains(req) {
                        for _, f := range c.callbacks.ReceivedRequested {
                                f(PeerMessageEvent{c, msg})
                        }
                }
                // Request has been satisfied.
-               if c.deleteRequest(req) {
-                       deletedRequest = true
+               if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) {
+                       intended = true
                        if !c.peerChoking {
                                c._chunksReceivedWhileExpecting++
                        }
@@ -1359,7 +1354,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                                c.updateRequests("Peer.receiveChunk deleted request")
                        }
                } else {
-                       chunksReceived.Add("unwanted", 1)
+                       chunksReceived.Add("unintended", 1)
                }
        }
 
@@ -1369,7 +1364,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        // Do we actually want this chunk?
        if t.haveChunk(ppReq) {
                // panic(fmt.Sprintf("%+v", ppReq))
-               chunksReceived.Add("wasted", 1)
+               chunksReceived.Add("redundant", 1)
                c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
                return nil
        }
@@ -1378,7 +1373,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
 
        c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
-       if deletedRequest {
+       if intended {
                c.piecesReceivedSinceLastRequestUpdate++
                c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
        }
@@ -1541,29 +1536,33 @@ func (c *Peer) peerHasWantedPieces() bool {
        return c.peerPieces().Intersects(&c.t._pendingPieces)
 }
 
+// Returns true if an outstanding request is removed. Cancelled requests should be handled
+// separately.
 func (c *Peer) deleteRequest(r RequestIndex) bool {
-       if !c.actualRequestState.Requests.CheckedRemove(r) {
+       if !c.requestState.Requests.CheckedRemove(r) {
                return false
        }
-       c.cancelledRequests.Remove(r)
        for _, f := range c.callbacks.DeletedRequest {
                f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
        }
        c.updateExpectingChunks()
-       if c.t.requestingPeer(r) == c {
-               delete(c.t.pendingRequests, r)
-               delete(c.t.lastRequested, r)
+       if c.t.requestingPeer(r) != c {
+               panic("only one peer should have a given request at a time")
        }
+       delete(c.t.pendingRequests, r)
+       delete(c.t.lastRequested, r)
        return true
 }
 
 func (c *Peer) deleteAllRequests() {
-       c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool {
-               c.deleteRequest(x)
+       c.requestState.Requests.Clone().Iterate(func(x uint32) bool {
+               if !c.deleteRequest(x) {
+                       panic("request should exist")
+               }
                return true
        })
-       if !c.actualRequestState.Requests.IsEmpty() {
-               panic(c.actualRequestState.Requests.GetCardinality())
+       if !c.requestState.Requests.IsEmpty() {
+               panic(c.requestState.Requests.GetCardinality())
        }
 }
 
@@ -1693,11 +1692,11 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
 }
 
 func (pc *PeerConn) isLowOnRequests() bool {
-       return pc.actualRequestState.Requests.IsEmpty()
+       return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty()
 }
 
 func (p *Peer) uncancelledRequests() uint64 {
-       return p.actualRequestState.Requests.GetCardinality() - p.cancelledRequests.GetCardinality()
+       return p.requestState.Requests.GetCardinality()
 }
 
 func (pc *PeerConn) remoteIsTransmission() bool {
index be87063665911cf79422aaa794f6c965183e4b1f..6a69535f955c70fd90f9721051e2971de7cb5298 100644 (file)
@@ -4,7 +4,10 @@ import (
        "github.com/RoaringBitmap/roaring"
 )
 
-type PeerNextRequestState struct {
+type PeerRequestState struct {
        Interested bool
-       Requests   roaring.Bitmap
+       // Expecting
+       Requests roaring.Bitmap
+       // Cancelled and waiting response
+       Cancelled roaring.Bitmap
 }
index c1ffa69762f4e7be678a0284495a428d4eca67c1..a3a7e1c3a9d5c5b35b7885274ac021d83193a41d 100644 (file)
@@ -101,8 +101,8 @@ func (p *peerRequests) Less(i, j int) bool {
        if leftPeer != nil {
                // The right peer should also be set, or we'd have resolved the computation by now.
                ml = ml.Uint64(
-                       rightPeer.actualRequestState.Requests.GetCardinality(),
-                       leftPeer.actualRequestState.Requests.GetCardinality(),
+                       rightPeer.requestState.Requests.GetCardinality(),
+                       leftPeer.requestState.Requests.GetCardinality(),
                )
                // Could either of the lastRequested be Zero? That's what checking an existing peer is for.
                leftLast := t.lastRequested[leftRequest]
@@ -171,7 +171,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                        allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
                        p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
                                r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
-                               // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
+                               // if p.t.pendingRequests.Get(r) != 0 && !p.requestState.Requests.Contains(r) {
                                //      return
                                // }
                                if !allowedFast {
@@ -183,11 +183,15 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                                        // have made the request previously (presumably while unchoked), and haven't had
                                        // the peer respond yet (and the request was retained because we are using the
                                        // fast extension).
-                                       if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
+                                       if p.peerChoking && !p.requestState.Requests.Contains(r) {
                                                // We can't request this right now.
                                                return
                                        }
                                }
+                               if p.requestState.Cancelled.Contains(r) {
+                                       // Can't re-request.
+                                       return
+                               }
                                requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
                        })
                },
@@ -215,7 +219,7 @@ func (p *Peer) maybeUpdateActualRequestState() bool {
 
 // Transmit/action the request state to the peer.
 func (p *Peer) applyRequestState(next desiredRequestState) bool {
-       current := &p.actualRequestState
+       current := &p.requestState
        if !p.setInterested(next.Interested) {
                return false
        }
@@ -225,11 +229,6 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
        heap.Init(requestHeap)
        for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() {
                req := heap.Pop(requestHeap).(RequestIndex)
-               if p.cancelledRequests.Contains(req) {
-                       // Waiting for a reject or piece message, which will suitably trigger us to update our
-                       // requests, so we can skip this one with no additional consideration.
-                       continue
-               }
                existing := t.requestingPeer(req)
                if existing != nil && existing != p {
                        // Don't steal from the poor.
index ade7b9338d7771dbec7190ce5ab9d818c97ac3a9..508de54c34ba4654ed7459076b813daa8880fe50 100644 (file)
@@ -1099,7 +1099,7 @@ func (t *Torrent) maybeNewConns() {
 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
        if t._pendingPieces.Contains(uint32(piece)) {
                t.iterPeers(func(c *Peer) {
-                       // if c.actualRequestState.Interested {
+                       // if c.requestState.Interested {
                        //      return
                        // }
                        if !c.isLowOnRequests() {
@@ -1420,7 +1420,7 @@ func (t *Torrent) assertPendingRequests() {
        //      actual.m = make([]int, t.numRequests())
        // }
        // t.iterPeers(func(p *Peer) {
-       //      p.actualRequestState.Requests.Iterate(func(x uint32) bool {
+       //      p.requestState.Requests.Iterate(func(x uint32) bool {
        //              actual.Inc(x)
        //              return true
        //      })
index 0cf6c7daa364e560db79a5ddb65da2d35f6ec7c7..221aa53fe71cca0253759de957185a5021f43140 100644 (file)
@@ -48,17 +48,14 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
        return true
 }
 
-func (ws *webseedPeer) _cancel(r RequestIndex) {
-       active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
-       if ok {
+func (ws *webseedPeer) _cancel(r RequestIndex) bool {
+       if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
                active.Cancel()
+               // The requester is running and will handle the result.
+               return true
        }
-       if !ws.peer.deleteRequest(r) {
-               panic("cancelled webseed request should exist")
-       }
-       if ws.peer.isLowOnRequests() {
-               ws.peer.updateRequests("webseedPeer._cancel")
-       }
+       // There should be no requester handling this, so no further events will occur.
+       return false
 }
 
 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
@@ -88,7 +85,7 @@ func (ws *webseedPeer) requester(i int) {
 start:
        for !ws.peer.closed.IsSet() {
                restart := false
-               ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
+               ws.peer.requestState.Requests.Iterate(func(x uint32) bool {
                        r := ws.peer.t.requestIndexToRequest(x)
                        if _, ok := ws.activeRequests[r]; ok {
                                return true
@@ -170,7 +167,9 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
                        log.Printf("closing %v", ws)
                        ws.peer.close()
                }
-               ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
+               if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
+                       panic("invalid reject")
+               }
                return err
        }
        err = ws.peer.receiveChunk(&pp.Message{
@@ -186,7 +185,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
 }
 
 func (me *webseedPeer) isLowOnRequests() bool {
-       return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests)
+       return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests)
 }
 
 func (me *webseedPeer) peerPieces() *roaring.Bitmap {