// indexable with the memory space available.
type (
maxRequests = int
- requestState = request_strategy.PeerNextRequestState
+ requestState = request_strategy.PeerRequestState
)
type Peer struct {
// Stuff controlled by the local peer.
needRequestUpdate string
- actualRequestState requestState
+ requestState requestState
updateRequestsTimer *time.Timer
- cancelledRequests roaring.Bitmap
lastBecameInterested time.Time
priorInterest time.Duration
choking bool
piecesReceivedSinceLastRequestUpdate maxRequests
maxPiecesReceivedBetweenRequestUpdates maxRequests
- // Chunks that we might reasonably expect to receive from the peer. Due to
- // latency, buffering, and implementation differences, we may receive
- // chunks that are no longer in the set of requests actually want.
+ // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
+ // and implementation differences, we may receive chunks that are no longer in the set of
+ // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
validReceiveChunks map[RequestIndex]int
// Indexed by metadata piece, set to true if posted and pending a
// response.
}
func (cn *Peer) expectingChunks() bool {
- if cn.actualRequestState.Requests.IsEmpty() {
+ if cn.requestState.Requests.IsEmpty() {
return false
}
- if !cn.actualRequestState.Interested {
+ if !cn.requestState.Interested {
return false
}
if !cn.peerChoking {
haveAllowedFastRequests := false
cn.peerAllowedFast.Iterate(func(i uint32) bool {
haveAllowedFastRequests = roaringBitmapRangeCardinality(
- &cn.actualRequestState.Requests,
+ &cn.requestState.Requests,
cn.t.pieceRequestIndexOffset(pieceIndex(i)),
cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
) == 0
func (cn *Peer) cumInterest() time.Duration {
ret := cn.priorInterest
- if cn.actualRequestState.Interested {
+ if cn.requestState.Interested {
ret += time.Since(cn.lastBecameInterested)
}
return ret
c := func(b byte) {
ret += string([]byte{b})
}
- if cn.actualRequestState.Interested {
+ if cn.requestState.Interested {
c('i')
}
if cn.choking {
func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
ret = make(map[pieceIndex]int)
- cn.actualRequestState.Requests.Iterate(func(x uint32) bool {
+ cn.requestState.Requests.Iterate(func(x uint32) bool {
ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
return true
})
cn.totalExpectingTime(),
)
fmt.Fprintf(w,
- " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d-%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+ " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
&cn._stats.ChunksReadUseful,
&cn._stats.ChunksRead,
&cn._stats.ChunksWritten,
- cn.actualRequestState.Requests.GetCardinality(),
- cn.cancelledRequests.GetCardinality(),
+ cn.requestState.Requests.GetCardinality(),
+ cn.requestState.Cancelled.GetCardinality(),
cn.nominalMaxRequests(),
cn.PeerMaxRequests,
len(cn.peerRequests),
}
func (cn *Peer) setInterested(interested bool) bool {
- if cn.actualRequestState.Interested == interested {
+ if cn.requestState.Interested == interested {
return true
}
- cn.actualRequestState.Interested = interested
+ cn.requestState.Interested = interested
if interested {
cn.lastBecameInterested = time.Now()
} else if !cn.lastBecameInterested.IsZero() {
if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
// This could occur if we made a request with the fast extension, and then got choked and
// haven't had the request rejected yet.
- if !cn.actualRequestState.Requests.Contains(r) {
+ if !cn.requestState.Requests.Contains(r) {
panic("peer choking and piece not allowed fast")
}
}
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
- if cn.actualRequestState.Requests.Contains(r) {
+ if cn.requestState.Requests.Contains(r) {
return true, nil
}
- if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
+ if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
return true, errors.New("too many outstanding requests")
}
- cn.actualRequestState.Requests.Add(r)
+ cn.requestState.Requests.Add(r)
if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[RequestIndex]int)
}
}
func (me *Peer) cancel(r RequestIndex) {
- if !me.actualRequestState.Requests.Contains(r) {
- panic(r)
+ if !me.deleteRequest(r) {
+ panic("request not existing should have been guarded")
+ }
+ if me._cancel(r) {
+ if !me.requestState.Cancelled.CheckedAdd(r) {
+ panic("request already cancelled")
+ }
+ }
+ if me.isLowOnRequests() {
+ me.updateRequests("Peer.cancel")
}
- me._cancel(r)
}
-func (me *PeerConn) _cancel(r RequestIndex) {
- if me.cancelledRequests.Contains(r) {
- // Already cancelled and waiting for a response.
- panic(r)
- }
+func (me *PeerConn) _cancel(r RequestIndex) bool {
+ me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
// Transmission does not send rejects for received cancels. See
// https://github.com/transmission/transmission/pull/2275.
- if me.fastEnabled() && !me.remoteIsTransmission() {
- me.cancelledRequests.Add(r)
- } else {
- if !me.deleteRequest(r) {
- panic("request not existing should have been guarded")
- }
- if me.isLowOnRequests() {
- me.updateRequests("Peer.cancel")
- }
- }
- me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
+ return me.fastEnabled() && !me.remoteIsTransmission()
}
func (cn *PeerConn) fillWriteBuffer() {
c.deleteAllRequests()
} else {
// We don't decrement pending requests here, let's wait for the peer to either
- // reject or satisfy the outstanding requests. Additionally some peers may unchoke
+ // reject or satisfy the outstanding requests. Additionally, some peers may unchoke
// us and resume where they left off, we don't want to have piled on to those chunks
- // in the meanwhile. I think a peers ability to abuse this should be limited: they
+ // in the meanwhile. I think a peer's ability to abuse this should be limited: they
// could let us request a lot of stuff, then choke us and never reject, but they're
// only a single peer, our chunk balancing should smooth over this abuse.
}
c.peerChoking = true
- // We can now reset our interest. I think we do this after setting the flag in case the
- // peerImpl updates synchronously (webseeds?).
- if !c.actualRequestState.Requests.IsEmpty() {
- c.updateRequests("choked")
- }
c.updateExpectingChunks()
case pp.Unchoke:
if !c.peerChoking {
}
c.peerChoking = false
preservedCount := 0
- c.actualRequestState.Requests.Iterate(func(x uint32) bool {
+ c.requestState.Requests.Iterate(func(x uint32) bool {
if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
preservedCount++
}
case pp.HaveNone:
err = c.peerSentHaveNone()
case pp.Reject:
- c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg)))
+ req := newRequestFromMessage(&msg)
+ if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
+ log.Printf("received invalid reject [request=%v, peer=%v]", req, c)
+ err = fmt.Errorf("received invalid reject [request=%v]", req)
+ }
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
}
}
-func (c *Peer) remoteRejectedRequest(r RequestIndex) {
- if c.deleteRequest(r) {
- if c.isLowOnRequests() {
- c.updateRequests("Peer.remoteRejectedRequest")
- }
- c.decExpectedChunkReceive(r)
+// Returns true if it was valid to reject the request.
+func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
+ if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) {
+ return false
+ }
+ if c.isLowOnRequests() {
+ c.updateRequests("Peer.remoteRejectedRequest")
}
+ c.decExpectedChunkReceive(r)
+ return true
}
func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
// The request needs to be deleted immediately to prevent cancels occurring asynchronously when
// have actually already received the piece, while we have the Client unlocked to write the data
// out.
- deletedRequest := false
+ intended := false
{
- if c.actualRequestState.Requests.Contains(req) {
+ if c.requestState.Requests.Contains(req) {
for _, f := range c.callbacks.ReceivedRequested {
f(PeerMessageEvent{c, msg})
}
}
// Request has been satisfied.
- if c.deleteRequest(req) {
- deletedRequest = true
+ if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) {
+ intended = true
if !c.peerChoking {
c._chunksReceivedWhileExpecting++
}
c.updateRequests("Peer.receiveChunk deleted request")
}
} else {
- chunksReceived.Add("unwanted", 1)
+ chunksReceived.Add("unintended", 1)
}
}
// Do we actually want this chunk?
if t.haveChunk(ppReq) {
// panic(fmt.Sprintf("%+v", ppReq))
- chunksReceived.Add("wasted", 1)
+ chunksReceived.Add("redundant", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
return nil
}
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
- if deletedRequest {
+ if intended {
c.piecesReceivedSinceLastRequestUpdate++
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
}
return c.peerPieces().Intersects(&c.t._pendingPieces)
}
+// Returns true if an outstanding request is removed. Cancelled requests should be handled
+// separately.
func (c *Peer) deleteRequest(r RequestIndex) bool {
- if !c.actualRequestState.Requests.CheckedRemove(r) {
+ if !c.requestState.Requests.CheckedRemove(r) {
return false
}
- c.cancelledRequests.Remove(r)
for _, f := range c.callbacks.DeletedRequest {
f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
}
c.updateExpectingChunks()
- if c.t.requestingPeer(r) == c {
- delete(c.t.pendingRequests, r)
- delete(c.t.lastRequested, r)
+ if c.t.requestingPeer(r) != c {
+ panic("only one peer should have a given request at a time")
}
+ delete(c.t.pendingRequests, r)
+ delete(c.t.lastRequested, r)
return true
}
func (c *Peer) deleteAllRequests() {
- c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool {
- c.deleteRequest(x)
+ c.requestState.Requests.Clone().Iterate(func(x uint32) bool {
+ if !c.deleteRequest(x) {
+ panic("request should exist")
+ }
return true
})
- if !c.actualRequestState.Requests.IsEmpty() {
- panic(c.actualRequestState.Requests.GetCardinality())
+ if !c.requestState.Requests.IsEmpty() {
+ panic(c.requestState.Requests.GetCardinality())
}
}
}
func (pc *PeerConn) isLowOnRequests() bool {
- return pc.actualRequestState.Requests.IsEmpty()
+ return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty()
}
func (p *Peer) uncancelledRequests() uint64 {
- return p.actualRequestState.Requests.GetCardinality() - p.cancelledRequests.GetCardinality()
+ return p.requestState.Requests.GetCardinality()
}
func (pc *PeerConn) remoteIsTransmission() bool {