From 23b4e2dc9c71667c731b3d1606287722b32260f7 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 13 Oct 2021 10:16:56 +1100 Subject: [PATCH] Wait for cancelled requests to be rejected per the spec --- peer-impl.go | 2 +- peerconn.go | 33 +++++++++++++++++++++++---------- requesting.go | 20 ++++++++++++-------- webseed-peer.go | 10 ++++++++-- 4 files changed, 44 insertions(+), 21 deletions(-) diff --git a/peer-impl.go b/peer-impl.go index 1dc154f7..23ced725 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -14,7 +14,7 @@ type peerImpl interface { // Neither of these return buffer room anymore, because they're currently both posted. There's // also PeerConn.writeBufferFull for when/where it matters. - _cancel(Request) bool + _cancel(RequestIndex) bool _request(Request) bool connectionFlags() string diff --git a/peerconn.go b/peerconn.go index 009d9a7d..aa03a741 100644 --- a/peerconn.go +++ b/peerconn.go @@ -84,6 +84,7 @@ type Peer struct { // Stuff controlled by the local peer. needRequestUpdate string actualRequestState requestState + cancelledRequests roaring.Bitmap lastBecameInterested time.Time priorInterest time.Duration @@ -607,17 +608,28 @@ func (me *PeerConn) _request(r Request) bool { } func (me *Peer) cancel(r RequestIndex) bool { - if me.deleteRequest(r) { + if !me.actualRequestState.Requests.Contains(r) { + return true + } + return me._cancel(r) +} + +func (me *PeerConn) _cancel(r RequestIndex) bool { + if me.cancelledRequests.Contains(r) { + // Already cancelled and waiting for a response. + return true + } + if me.fastEnabled() { + me.cancelledRequests.Add(r) + } else { + if !me.deleteRequest(r) { + panic("request not existing should have been guarded") + } if me.actualRequestState.Requests.GetCardinality() == 0 { me.updateRequests("Peer.cancel") } - return me.peerImpl._cancel(me.t.requestIndexToRequest(r)) } - return true -} - -func (me *PeerConn) _cancel(r Request) bool { - return me.write(makeCancelMessage(r)) + return me.write(makeCancelMessage(me.t.requestIndexToRequest(r))) } func (cn *PeerConn) fillWriteBuffer() { @@ -1299,6 +1311,9 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { if !c.peerChoking { c._chunksReceivedWhileExpecting++ } + if c.actualRequestState.Requests.GetCardinality() == 0 { + c.updateRequests("Peer.receiveChunk deleted request") + } } else { chunksReceived.Add("unwanted", 1) } @@ -1320,9 +1335,6 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) if deletedRequest { c.piecesReceivedSinceLastRequestUpdate++ - if c.actualRequestState.Requests.GetCardinality() == 0 { - c.updateRequests("Peer.receiveChunk deleted request") - } c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) } for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData { @@ -1490,6 +1502,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool { if !c.actualRequestState.Requests.CheckedRemove(r) { return false } + c.cancelledRequests.Remove(r) for _, f := range c.callbacks.DeletedRequest { f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) } diff --git a/requesting.go b/requesting.go index c1575885..0ec4151b 100644 --- a/requesting.go +++ b/requesting.go @@ -11,7 +11,6 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/anacrolix/log" - "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" request_strategy "github.com/anacrolix/torrent/request-strategy" @@ -268,18 +267,23 @@ func (p *Peer) applyRequestState(next requestState) bool { return false } next.Requests.Iterate(func(req uint32) bool { - // This could happen if the peer chokes us between the next state being generated, and us - // trying to transmit the state. - if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) { - return true + if p.cancelledRequests.Contains(req) { + log.Printf("waiting for cancelled request %v", req) + return false + } + if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { + log.Printf("not assigning all requests [desired=%v, cancelled=%v, max=%v]", + current.Requests.GetCardinality(), + p.cancelledRequests.GetCardinality(), + p.nominalMaxRequests(), + ) + return false } var err error more, err = p.request(req) if err != nil { panic(err) - } /* else { - log.Print(req) - } */ + } return more }) if more { diff --git a/webseed-peer.go b/webseed-peer.go index ebdd8dba..d2c46681 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -45,10 +45,16 @@ func (ws *webseedPeer) writeInterested(interested bool) bool { return true } -func (ws *webseedPeer) _cancel(r Request) bool { - active, ok := ws.activeRequests[r] +func (ws *webseedPeer) _cancel(r RequestIndex) bool { + active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)] if ok { active.Cancel() + if !ws.peer.deleteRequest(r) { + panic("cancelled webseed request should exist") + } + if ws.peer.actualRequestState.Requests.GetCardinality() == 0 { + ws.peer.updateRequests("webseedPeer._cancel") + } } return true } -- 2.44.0