// Stuff controlled by the local peer.
needRequestUpdate string
actualRequestState requestState
+ cancelledRequests roaring.Bitmap
lastBecameInterested time.Time
priorInterest time.Duration
}
func (me *Peer) cancel(r RequestIndex) bool {
- if me.deleteRequest(r) {
+ if !me.actualRequestState.Requests.Contains(r) {
+ return true
+ }
+ return me._cancel(r)
+}
+
+func (me *PeerConn) _cancel(r RequestIndex) bool {
+ if me.cancelledRequests.Contains(r) {
+ // Already cancelled and waiting for a response.
+ return true
+ }
+ if me.fastEnabled() {
+ me.cancelledRequests.Add(r)
+ } else {
+ if !me.deleteRequest(r) {
+ panic("request not existing should have been guarded")
+ }
if me.actualRequestState.Requests.GetCardinality() == 0 {
me.updateRequests("Peer.cancel")
}
- return me.peerImpl._cancel(me.t.requestIndexToRequest(r))
}
- return true
-}
-
-func (me *PeerConn) _cancel(r Request) bool {
- return me.write(makeCancelMessage(r))
+ return me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
}
func (cn *PeerConn) fillWriteBuffer() {
if !c.peerChoking {
c._chunksReceivedWhileExpecting++
}
+ if c.actualRequestState.Requests.GetCardinality() == 0 {
+ c.updateRequests("Peer.receiveChunk deleted request")
+ }
} else {
chunksReceived.Add("unwanted", 1)
}
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
if deletedRequest {
c.piecesReceivedSinceLastRequestUpdate++
- if c.actualRequestState.Requests.GetCardinality() == 0 {
- c.updateRequests("Peer.receiveChunk deleted request")
- }
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
}
for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
if !c.actualRequestState.Requests.CheckedRemove(r) {
return false
}
+ c.cancelledRequests.Remove(r)
for _, f := range c.callbacks.DeletedRequest {
f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
}
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/log"
- "github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
request_strategy "github.com/anacrolix/torrent/request-strategy"
return false
}
next.Requests.Iterate(func(req uint32) bool {
- // This could happen if the peer chokes us between the next state being generated, and us
- // trying to transmit the state.
- if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
- return true
+ if p.cancelledRequests.Contains(req) {
+ log.Printf("waiting for cancelled request %v", req)
+ return false
+ }
+ if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
+ log.Printf("not assigning all requests [desired=%v, cancelled=%v, max=%v]",
+ current.Requests.GetCardinality(),
+ p.cancelledRequests.GetCardinality(),
+ p.nominalMaxRequests(),
+ )
+ return false
}
var err error
more, err = p.request(req)
if err != nil {
panic(err)
- } /* else {
- log.Print(req)
- } */
+ }
return more
})
if more {
return true
}
-func (ws *webseedPeer) _cancel(r Request) bool {
- active, ok := ws.activeRequests[r]
+func (ws *webseedPeer) _cancel(r RequestIndex) bool {
+ active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
if ok {
active.Cancel()
+ if !ws.peer.deleteRequest(r) {
+ panic("cancelled webseed request should exist")
+ }
+ if ws.peer.actualRequestState.Requests.GetCardinality() == 0 {
+ ws.peer.updateRequests("webseedPeer._cancel")
+ }
}
return true
}