This might be one solution to https://github.com/anacrolix/torrent/issues/698.
}
}
p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
- p.updateRequestsTimer.Stop()
}
+const peerUpdateRequestsTimerReason = "updateRequestsTimer"
+
func (c *Peer) updateRequestsTimerFunc() {
c.locker().Lock()
defer c.locker().Unlock()
if c.closed.IsSet() {
return
}
- if c.needRequestUpdate != "" {
- return
- }
if c.isLowOnRequests() {
// If there are no outstanding requests, then a request update should have already run.
return
}
- c.updateRequests("updateRequestsTimer")
+ if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
+ log.Printf("spurious timer requests update [interval=%v]", d)
+ return
+ }
+ c.updateRequests(peerUpdateRequestsTimerReason)
}
// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
return ret
}
+func maxInt(as ...int) int {
+ ret := as[0]
+ for _, a := range as[1:] {
+ if a > ret {
+ ret = a
+ }
+ }
+ return ret
+}
+
func min(as ...int64) int64 {
ret := as[0]
for _, a := range as[1:] {
type peerImpl interface {
// Trigger the actual request state to get updated
handleUpdateRequests()
- // Whether the outstanding local request cardinality is low enough to warrant an update.
- isLowOnRequests() bool
writeInterested(interested bool) bool
// _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
--- /dev/null
+package torrent
+
+func (p *Peer) isLowOnRequests() bool {
+ return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
+}
+
+func (p *Peer) decPeakRequests() {
+ // // This can occur when peak requests are altered by the update request timer to be lower than
+ // // the actual number of outstanding requests. Let's let it go negative and see what happens. I
+ // // wonder what happens if maxRequests is not signed.
+ // if p.peakRequests < 1 {
+ // panic(p.peakRequests)
+ // }
+ p.peakRequests--
+}
needRequestUpdate string
requestState requestState
updateRequestsTimer *time.Timer
+ lastRequestUpdate time.Time
+ peakRequests maxRequests
lastBecameInterested time.Time
priorInterest time.Duration
// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
// https://github.com/pion/datachannel/issues/59 is fixed.
-const writeBufferHighWaterLen = 1 << 15
+const (
+ writeBufferHighWaterLen = 1 << 15
+ writeBufferLowWaterLen = writeBufferHighWaterLen / 2
+)
// Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is
// done asynchronously, so it may be that we're not able to honour backpressure from this method.
return index < len(cn.metadataRequests) && cn.metadataRequests[index]
}
+var (
+ interestedMsgLen = len(pp.Message{Type: pp.Interested}.MustMarshalBinary())
+ requestMsgLen = len(pp.Message{Type: pp.Request}.MustMarshalBinary())
+ // This is the maximum request count that could fit in the write buffer if it's at or below the
+ // low water mark when we run maybeUpdateActualRequestState.
+ maxLocalToRemoteRequests = (writeBufferHighWaterLen - writeBufferLowWaterLen - interestedMsgLen) / requestMsgLen
+)
+
// The actual value to use as the maximum outbound requests.
-func (cn *Peer) nominalMaxRequests() (ret maxRequests) {
- return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 2048))
+func (cn *Peer) nominalMaxRequests() maxRequests {
+ return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)))
}
func (cn *Peer) totalExpectingTime() (ret time.Duration) {
panic("request already cancelled")
}
}
+ me.decPeakRequests()
if me.isLowOnRequests() {
me.updateRequests("Peer.cancel")
}
}
func (cn *PeerConn) fillWriteBuffer() {
- if !cn.maybeUpdateActualRequestState() {
- return
- }
+ if cn.messageWriter.writeBuffer.Len() > writeBufferLowWaterLen {
+ // Fully committing to our max requests requires sufficient space (see
+ // maxLocalToRemoteRequests). Flush what we have instead. We also prefer always to make
+ // requests than to do PEX or upload, so we short-circuit before handling those. Any update
+ // request reason will not be cleared, so we'll come right back here when there's space. We
+ // can't do this in maybeUpdateActualRequestState because it's a method on Peer and has no
+ // knowledge of write buffers.
+ }
+ cn.maybeUpdateActualRequestState()
if cn.pex.IsEnabled() {
if flow := cn.pex.Share(cn.write); !flow {
return
if cn.needRequestUpdate != "" {
return
}
+ if reason != peerUpdateRequestsTimerReason && !cn.isLowOnRequests() {
+ return
+ }
cn.needRequestUpdate = reason
cn.handleUpdateRequests()
}
// Returns true if it was valid to reject the request.
func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
- if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) {
+ if c.deleteRequest(r) {
+ c.decPeakRequests()
+ } else if !c.requestState.Cancelled.CheckedRemove(r) {
return false
}
if c.isLowOnRequests() {
return pc, ok
}
-func (pc *PeerConn) isLowOnRequests() bool {
- return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty()
-}
-
func (p *Peer) uncancelledRequests() uint64 {
return p.requestState.Requests.GetCardinality()
}
{2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
})
}
+
+func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
+ c := qt.New(t)
+ c.Check(interestedMsgLen, qt.Equals, 5)
+ c.Check(requestMsgLen, qt.Equals, 17)
+ c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
+ c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
+}
type PeerRequestState struct {
Interested bool
- // Expecting
+ // Expecting. TODO: This should be ordered so webseed requesters initiate in the same order they
+ // were assigned.
Requests roaring.Bitmap
// Cancelled and waiting response
Cancelled roaring.Bitmap
"container/heap"
"context"
"encoding/gob"
+ "fmt"
"reflect"
"runtime/pprof"
"time"
return
}
-func (p *Peer) maybeUpdateActualRequestState() bool {
+func (p *Peer) maybeUpdateActualRequestState() {
+ if p.closed.IsSet() {
+ return
+ }
if p.needRequestUpdate == "" {
- return true
+ return
+ }
+ if p.needRequestUpdate == peerUpdateRequestsTimerReason {
+ since := time.Since(p.lastRequestUpdate)
+ if since < updateRequestsTimerDuration {
+ panic(since)
+ }
}
- var more bool
pprof.Do(
context.Background(),
pprof.Labels("update request", p.needRequestUpdate),
func(_ context.Context) {
next := p.getDesiredRequestState()
- more = p.applyRequestState(next)
+ p.applyRequestState(next)
},
)
- return more
}
// Transmit/action the request state to the peer.
-func (p *Peer) applyRequestState(next desiredRequestState) bool {
+func (p *Peer) applyRequestState(next desiredRequestState) {
current := &p.requestState
if !p.setInterested(next.Interested) {
- return false
+ panic("insufficient write buffer")
}
more := true
requestHeap := &next.Requests
t := p.t
+ originalRequestCount := current.Requests.GetCardinality()
+ // We're either here on a timer, or because we ran out of requests. Both are valid reasons to
+ // alter peakRequests.
+ if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason {
+ panic(fmt.Sprintf(
+ "expected zero existing requests (%v) for update reason %q",
+ originalRequestCount, p.needRequestUpdate))
+ }
heap.Init(requestHeap)
for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() {
req := heap.Pop(requestHeap).(RequestIndex)
break
}
}
- // TODO: This may need to change, we might want to update even if there were no requests due to
- // filtering them for being recently requested already.
- p.updateRequestsTimer.Stop()
- if more {
- p.needRequestUpdate = ""
- if current.Interested {
- p.updateRequestsTimer.Reset(3 * time.Second)
- }
+ if !more {
+ // This might fail if we incorrectly determine that we can fit up to the maximum allowed
+ // requests into the available write buffer space. We don't want that to happen because it
+ // makes our peak requests dependent on how much was already in the buffer.
+ panic(fmt.Sprintf(
+ "couldn't fill apply entire request state [newRequests=%v]",
+ current.Requests.GetCardinality()-originalRequestCount))
}
- return more
+ newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount)
+ // log.Printf(
+ // "requests %v->%v (peak %v->%v) reason %q (peer %v)",
+ // originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p)
+ p.peakRequests = newPeakRequests
+ p.needRequestUpdate = ""
+ p.lastRequestUpdate = time.Now()
+ p.updateRequestsTimer.Reset(updateRequestsTimerDuration)
}
+
+// This could be set to 10s to match the unchoke/request update interval recommended by some
+// specifications. I've set it shorter to trigger it more often for testing for now.
+const updateRequestsTimerDuration = 3 * time.Second
defer t.cl.unlock()
t.dataUploadDisallowed = true
for c := range t.conns {
+ // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
c.updateRequests("disallow data upload")
}
}
return err
}
-func (me *webseedPeer) isLowOnRequests() bool {
- return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests)
-}
-
func (me *webseedPeer) peerPieces() *roaring.Bitmap {
return &me.client.Pieces
}