From 04beb8937fbebe3b651d69fadcd918fe401d4678 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 12 Dec 2021 00:04:06 +1100 Subject: [PATCH] More consistent cancellation management --- peer-impl.go | 6 +- peerconn.go | 145 +++++++++++++++++++-------------------- request-strategy/peer.go | 7 +- requesting.go | 19 +++-- torrent.go | 4 +- webseed-peer.go | 23 +++---- 6 files changed, 102 insertions(+), 102 deletions(-) diff --git a/peer-impl.go b/peer-impl.go index d400d9a4..47b4345a 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -15,9 +15,9 @@ type peerImpl interface { isLowOnRequests() bool writeInterested(interested bool) bool - // Neither of these return buffer room anymore, because they're currently both posted. There's - // also PeerConn.writeBufferFull for when/where it matters. - _cancel(RequestIndex) + // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be + // handled by a follow-up event. + _cancel(RequestIndex) (acked bool) _request(Request) bool connectionFlags() string onClose() diff --git a/peerconn.go b/peerconn.go index f36da1c0..098d93fc 100644 --- a/peerconn.go +++ b/peerconn.go @@ -52,7 +52,7 @@ type PeerRemoteAddr interface { // indexable with the memory space available. type ( maxRequests = int - requestState = request_strategy.PeerNextRequestState + requestState = request_strategy.PeerRequestState ) type Peer struct { @@ -84,9 +84,8 @@ type Peer struct { // Stuff controlled by the local peer. needRequestUpdate string - actualRequestState requestState + requestState requestState updateRequestsTimer *time.Timer - cancelledRequests roaring.Bitmap lastBecameInterested time.Time priorInterest time.Duration @@ -97,9 +96,9 @@ type Peer struct { choking bool piecesReceivedSinceLastRequestUpdate maxRequests maxPiecesReceivedBetweenRequestUpdates maxRequests - // Chunks that we might reasonably expect to receive from the peer. Due to - // latency, buffering, and implementation differences, we may receive - // chunks that are no longer in the set of requests actually want. + // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering, + // and implementation differences, we may receive chunks that are no longer in the set of + // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable. validReceiveChunks map[RequestIndex]int // Indexed by metadata piece, set to true if posted and pending a // response. @@ -177,10 +176,10 @@ func (cn *Peer) updateExpectingChunks() { } func (cn *Peer) expectingChunks() bool { - if cn.actualRequestState.Requests.IsEmpty() { + if cn.requestState.Requests.IsEmpty() { return false } - if !cn.actualRequestState.Interested { + if !cn.requestState.Interested { return false } if !cn.peerChoking { @@ -189,7 +188,7 @@ func (cn *Peer) expectingChunks() bool { haveAllowedFastRequests := false cn.peerAllowedFast.Iterate(func(i uint32) bool { haveAllowedFastRequests = roaringBitmapRangeCardinality( - &cn.actualRequestState.Requests, + &cn.requestState.Requests, cn.t.pieceRequestIndexOffset(pieceIndex(i)), cn.t.pieceRequestIndexOffset(pieceIndex(i+1)), ) == 0 @@ -230,7 +229,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) { func (cn *Peer) cumInterest() time.Duration { ret := cn.priorInterest - if cn.actualRequestState.Interested { + if cn.requestState.Interested { ret += time.Since(cn.lastBecameInterested) } return ret @@ -318,7 +317,7 @@ func (cn *Peer) statusFlags() (ret string) { c := func(b byte) { ret += string([]byte{b}) } - if cn.actualRequestState.Interested { + if cn.requestState.Interested { c('i') } if cn.choking { @@ -346,7 +345,7 @@ func (cn *Peer) downloadRate() float64 { func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) { ret = make(map[pieceIndex]int) - cn.actualRequestState.Requests.Iterate(func(x uint32) bool { + cn.requestState.Requests.Iterate(func(x uint32) bool { ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++ return true }) @@ -373,14 +372,14 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { cn.totalExpectingTime(), ) fmt.Fprintf(w, - " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d-%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", + " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", cn.completedString(), len(cn.peerTouchedPieces), &cn._stats.ChunksReadUseful, &cn._stats.ChunksRead, &cn._stats.ChunksWritten, - cn.actualRequestState.Requests.GetCardinality(), - cn.cancelledRequests.GetCardinality(), + cn.requestState.Requests.GetCardinality(), + cn.requestState.Cancelled.GetCardinality(), cn.nominalMaxRequests(), cn.PeerMaxRequests, len(cn.peerRequests), @@ -537,10 +536,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool { } func (cn *Peer) setInterested(interested bool) bool { - if cn.actualRequestState.Interested == interested { + if cn.requestState.Interested == interested { return true } - cn.actualRequestState.Interested = interested + cn.requestState.Interested = interested if interested { cn.lastBecameInterested = time.Now() } else if !cn.lastBecameInterested.IsZero() { @@ -589,7 +588,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error { if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) { // This could occur if we made a request with the fast extension, and then got choked and // haven't had the request rejected yet. - if !cn.actualRequestState.Requests.Contains(r) { + if !cn.requestState.Requests.Contains(r) { panic("peer choking and piece not allowed fast") } } @@ -608,13 +607,13 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { if err := cn.shouldRequest(r); err != nil { panic(err) } - if cn.actualRequestState.Requests.Contains(r) { + if cn.requestState.Requests.Contains(r) { return true, nil } - if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() { + if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() { return true, errors.New("too many outstanding requests") } - cn.actualRequestState.Requests.Add(r) + cn.requestState.Requests.Add(r) if cn.validReceiveChunks == nil { cn.validReceiveChunks = make(map[RequestIndex]int) } @@ -639,30 +638,24 @@ func (me *PeerConn) _request(r Request) bool { } func (me *Peer) cancel(r RequestIndex) { - if !me.actualRequestState.Requests.Contains(r) { - panic(r) + if !me.deleteRequest(r) { + panic("request not existing should have been guarded") + } + if me._cancel(r) { + if !me.requestState.Cancelled.CheckedAdd(r) { + panic("request already cancelled") + } + } + if me.isLowOnRequests() { + me.updateRequests("Peer.cancel") } - me._cancel(r) } -func (me *PeerConn) _cancel(r RequestIndex) { - if me.cancelledRequests.Contains(r) { - // Already cancelled and waiting for a response. - panic(r) - } +func (me *PeerConn) _cancel(r RequestIndex) bool { + me.write(makeCancelMessage(me.t.requestIndexToRequest(r))) // Transmission does not send rejects for received cancels. See // https://github.com/transmission/transmission/pull/2275. - if me.fastEnabled() && !me.remoteIsTransmission() { - me.cancelledRequests.Add(r) - } else { - if !me.deleteRequest(r) { - panic("request not existing should have been guarded") - } - if me.isLowOnRequests() { - me.updateRequests("Peer.cancel") - } - } - me.write(makeCancelMessage(me.t.requestIndexToRequest(r))) + return me.fastEnabled() && !me.remoteIsTransmission() } func (cn *PeerConn) fillWriteBuffer() { @@ -1102,18 +1095,13 @@ func (c *PeerConn) mainReadLoop() (err error) { c.deleteAllRequests() } else { // We don't decrement pending requests here, let's wait for the peer to either - // reject or satisfy the outstanding requests. Additionally some peers may unchoke + // reject or satisfy the outstanding requests. Additionally, some peers may unchoke // us and resume where they left off, we don't want to have piled on to those chunks - // in the meanwhile. I think a peers ability to abuse this should be limited: they + // in the meanwhile. I think a peer's ability to abuse this should be limited: they // could let us request a lot of stuff, then choke us and never reject, but they're // only a single peer, our chunk balancing should smooth over this abuse. } c.peerChoking = true - // We can now reset our interest. I think we do this after setting the flag in case the - // peerImpl updates synchronously (webseeds?). - if !c.actualRequestState.Requests.IsEmpty() { - c.updateRequests("choked") - } c.updateExpectingChunks() case pp.Unchoke: if !c.peerChoking { @@ -1124,7 +1112,7 @@ func (c *PeerConn) mainReadLoop() (err error) { } c.peerChoking = false preservedCount := 0 - c.actualRequestState.Requests.Iterate(func(x uint32) bool { + c.requestState.Requests.Iterate(func(x uint32) bool { if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) { preservedCount++ } @@ -1194,7 +1182,11 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.HaveNone: err = c.peerSentHaveNone() case pp.Reject: - c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg))) + req := newRequestFromMessage(&msg) + if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) { + log.Printf("received invalid reject [request=%v, peer=%v]", req, c) + err = fmt.Errorf("received invalid reject [request=%v]", req) + } case pp.AllowedFast: torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger) @@ -1210,13 +1202,16 @@ func (c *PeerConn) mainReadLoop() (err error) { } } -func (c *Peer) remoteRejectedRequest(r RequestIndex) { - if c.deleteRequest(r) { - if c.isLowOnRequests() { - c.updateRequests("Peer.remoteRejectedRequest") - } - c.decExpectedChunkReceive(r) +// Returns true if it was valid to reject the request. +func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { + if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) { + return false + } + if c.isLowOnRequests() { + c.updateRequests("Peer.remoteRejectedRequest") } + c.decExpectedChunkReceive(r) + return true } func (c *Peer) decExpectedChunkReceive(r RequestIndex) { @@ -1342,16 +1337,16 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // The request needs to be deleted immediately to prevent cancels occurring asynchronously when // have actually already received the piece, while we have the Client unlocked to write the data // out. - deletedRequest := false + intended := false { - if c.actualRequestState.Requests.Contains(req) { + if c.requestState.Requests.Contains(req) { for _, f := range c.callbacks.ReceivedRequested { f(PeerMessageEvent{c, msg}) } } // Request has been satisfied. - if c.deleteRequest(req) { - deletedRequest = true + if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) { + intended = true if !c.peerChoking { c._chunksReceivedWhileExpecting++ } @@ -1359,7 +1354,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.updateRequests("Peer.receiveChunk deleted request") } } else { - chunksReceived.Add("unwanted", 1) + chunksReceived.Add("unintended", 1) } } @@ -1369,7 +1364,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // Do we actually want this chunk? if t.haveChunk(ppReq) { // panic(fmt.Sprintf("%+v", ppReq)) - chunksReceived.Add("wasted", 1) + chunksReceived.Add("redundant", 1) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) return nil } @@ -1378,7 +1373,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) - if deletedRequest { + if intended { c.piecesReceivedSinceLastRequestUpdate++ c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) } @@ -1541,29 +1536,33 @@ func (c *Peer) peerHasWantedPieces() bool { return c.peerPieces().Intersects(&c.t._pendingPieces) } +// Returns true if an outstanding request is removed. Cancelled requests should be handled +// separately. func (c *Peer) deleteRequest(r RequestIndex) bool { - if !c.actualRequestState.Requests.CheckedRemove(r) { + if !c.requestState.Requests.CheckedRemove(r) { return false } - c.cancelledRequests.Remove(r) for _, f := range c.callbacks.DeletedRequest { f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) } c.updateExpectingChunks() - if c.t.requestingPeer(r) == c { - delete(c.t.pendingRequests, r) - delete(c.t.lastRequested, r) + if c.t.requestingPeer(r) != c { + panic("only one peer should have a given request at a time") } + delete(c.t.pendingRequests, r) + delete(c.t.lastRequested, r) return true } func (c *Peer) deleteAllRequests() { - c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool { - c.deleteRequest(x) + c.requestState.Requests.Clone().Iterate(func(x uint32) bool { + if !c.deleteRequest(x) { + panic("request should exist") + } return true }) - if !c.actualRequestState.Requests.IsEmpty() { - panic(c.actualRequestState.Requests.GetCardinality()) + if !c.requestState.Requests.IsEmpty() { + panic(c.requestState.Requests.GetCardinality()) } } @@ -1693,11 +1692,11 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { } func (pc *PeerConn) isLowOnRequests() bool { - return pc.actualRequestState.Requests.IsEmpty() + return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty() } func (p *Peer) uncancelledRequests() uint64 { - return p.actualRequestState.Requests.GetCardinality() - p.cancelledRequests.GetCardinality() + return p.requestState.Requests.GetCardinality() } func (pc *PeerConn) remoteIsTransmission() bool { diff --git a/request-strategy/peer.go b/request-strategy/peer.go index be870636..6a69535f 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -4,7 +4,10 @@ import ( "github.com/RoaringBitmap/roaring" ) -type PeerNextRequestState struct { +type PeerRequestState struct { Interested bool - Requests roaring.Bitmap + // Expecting + Requests roaring.Bitmap + // Cancelled and waiting response + Cancelled roaring.Bitmap } diff --git a/requesting.go b/requesting.go index c1ffa697..a3a7e1c3 100644 --- a/requesting.go +++ b/requesting.go @@ -101,8 +101,8 @@ func (p *peerRequests) Less(i, j int) bool { if leftPeer != nil { // The right peer should also be set, or we'd have resolved the computation by now. ml = ml.Uint64( - rightPeer.actualRequestState.Requests.GetCardinality(), - leftPeer.actualRequestState.Requests.GetCardinality(), + rightPeer.requestState.Requests.GetCardinality(), + leftPeer.requestState.Requests.GetCardinality(), ) // Could either of the lastRequested be Zero? That's what checking an existing peer is for. leftLast := t.lastRequested[leftRequest] @@ -171,7 +171,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) { r := p.t.pieceRequestIndexOffset(pieceIndex) + ci - // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) { + // if p.t.pendingRequests.Get(r) != 0 && !p.requestState.Requests.Contains(r) { // return // } if !allowedFast { @@ -183,11 +183,15 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { // have made the request previously (presumably while unchoked), and haven't had // the peer respond yet (and the request was retained because we are using the // fast extension). - if p.peerChoking && !p.actualRequestState.Requests.Contains(r) { + if p.peerChoking && !p.requestState.Requests.Contains(r) { // We can't request this right now. return } } + if p.requestState.Cancelled.Contains(r) { + // Can't re-request. + return + } requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) }) }, @@ -215,7 +219,7 @@ func (p *Peer) maybeUpdateActualRequestState() bool { // Transmit/action the request state to the peer. func (p *Peer) applyRequestState(next desiredRequestState) bool { - current := &p.actualRequestState + current := &p.requestState if !p.setInterested(next.Interested) { return false } @@ -225,11 +229,6 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool { heap.Init(requestHeap) for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() { req := heap.Pop(requestHeap).(RequestIndex) - if p.cancelledRequests.Contains(req) { - // Waiting for a reject or piece message, which will suitably trigger us to update our - // requests, so we can skip this one with no additional consideration. - continue - } existing := t.requestingPeer(req) if existing != nil && existing != p { // Don't steal from the poor. diff --git a/torrent.go b/torrent.go index ade7b933..508de54c 100644 --- a/torrent.go +++ b/torrent.go @@ -1099,7 +1099,7 @@ func (t *Torrent) maybeNewConns() { func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { if t._pendingPieces.Contains(uint32(piece)) { t.iterPeers(func(c *Peer) { - // if c.actualRequestState.Interested { + // if c.requestState.Interested { // return // } if !c.isLowOnRequests() { @@ -1420,7 +1420,7 @@ func (t *Torrent) assertPendingRequests() { // actual.m = make([]int, t.numRequests()) // } // t.iterPeers(func(p *Peer) { - // p.actualRequestState.Requests.Iterate(func(x uint32) bool { + // p.requestState.Requests.Iterate(func(x uint32) bool { // actual.Inc(x) // return true // }) diff --git a/webseed-peer.go b/webseed-peer.go index 0cf6c7da..221aa53f 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -48,17 +48,14 @@ func (ws *webseedPeer) writeInterested(interested bool) bool { return true } -func (ws *webseedPeer) _cancel(r RequestIndex) { - active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)] - if ok { +func (ws *webseedPeer) _cancel(r RequestIndex) bool { + if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok { active.Cancel() + // The requester is running and will handle the result. + return true } - if !ws.peer.deleteRequest(r) { - panic("cancelled webseed request should exist") - } - if ws.peer.isLowOnRequests() { - ws.peer.updateRequests("webseedPeer._cancel") - } + // There should be no requester handling this, so no further events will occur. + return false } func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { @@ -88,7 +85,7 @@ func (ws *webseedPeer) requester(i int) { start: for !ws.peer.closed.IsSet() { restart := false - ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool { + ws.peer.requestState.Requests.Iterate(func(x uint32) bool { r := ws.peer.t.requestIndexToRequest(x) if _, ok := ws.activeRequests[r]; ok { return true @@ -170,7 +167,9 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re log.Printf("closing %v", ws) ws.peer.close() } - ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) + if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { + panic("invalid reject") + } return err } err = ws.peer.receiveChunk(&pp.Message{ @@ -186,7 +185,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re } func (me *webseedPeer) isLowOnRequests() bool { - return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests) + return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests) } func (me *webseedPeer) peerPieces() *roaring.Bitmap { -- 2.44.0