From 1bae62fd222402b483f638cdcc27dda5a47a29ed Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 24 Dec 2021 08:55:57 +1100 Subject: [PATCH] Dynamic outbound max requests This might be one solution to https://github.com/anacrolix/torrent/issues/698. --- client.go | 12 +++++---- misc.go | 10 +++++++ peer-impl.go | 2 -- peer.go | 15 +++++++++++ peerconn.go | 43 +++++++++++++++++++++-------- peerconn_test.go | 8 ++++++ request-strategy/peer.go | 3 ++- requesting.go | 58 +++++++++++++++++++++++++++++----------- torrent.go | 1 + webseed-peer.go | 4 --- 10 files changed, 117 insertions(+), 39 deletions(-) create mode 100644 peer.go diff --git a/client.go b/client.go index a6de6c08..e36ec0e9 100644 --- a/client.go +++ b/client.go @@ -985,23 +985,25 @@ func (p *Peer) initUpdateRequestsTimer() { } } p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc) - p.updateRequestsTimer.Stop() } +const peerUpdateRequestsTimerReason = "updateRequestsTimer" + func (c *Peer) updateRequestsTimerFunc() { c.locker().Lock() defer c.locker().Unlock() if c.closed.IsSet() { return } - if c.needRequestUpdate != "" { - return - } if c.isLowOnRequests() { // If there are no outstanding requests, then a request update should have already run. return } - c.updateRequests("updateRequestsTimer") + if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration { + log.Printf("spurious timer requests update [interval=%v]", d) + return + } + c.updateRequests(peerUpdateRequestsTimerReason) } // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this diff --git a/misc.go b/misc.go index db924bbe..4041b776 100644 --- a/misc.go +++ b/misc.go @@ -143,6 +143,16 @@ func max(as ...int64) int64 { return ret } +func maxInt(as ...int) int { + ret := as[0] + for _, a := range as[1:] { + if a > ret { + ret = a + } + } + return ret +} + func min(as ...int64) int64 { ret := as[0] for _, a := range as[1:] { diff --git a/peer-impl.go b/peer-impl.go index 47b4345a..e29fb439 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -11,8 +11,6 @@ import ( type peerImpl interface { // Trigger the actual request state to get updated handleUpdateRequests() - // Whether the outstanding local request cardinality is low enough to warrant an update. - isLowOnRequests() bool writeInterested(interested bool) bool // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be diff --git a/peer.go b/peer.go new file mode 100644 index 00000000..e1bab184 --- /dev/null +++ b/peer.go @@ -0,0 +1,15 @@ +package torrent + +func (p *Peer) isLowOnRequests() bool { + return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty() +} + +func (p *Peer) decPeakRequests() { + // // This can occur when peak requests are altered by the update request timer to be lower than + // // the actual number of outstanding requests. Let's let it go negative and see what happens. I + // // wonder what happens if maxRequests is not signed. + // if p.peakRequests < 1 { + // panic(p.peakRequests) + // } + p.peakRequests-- +} diff --git a/peerconn.go b/peerconn.go index 40692225..fb28dc22 100644 --- a/peerconn.go +++ b/peerconn.go @@ -86,6 +86,8 @@ type Peer struct { needRequestUpdate string requestState requestState updateRequestsTimer *time.Timer + lastRequestUpdate time.Time + peakRequests maxRequests lastBecameInterested time.Time priorInterest time.Duration @@ -445,7 +447,10 @@ func (cn *Peer) peerHasPiece(piece pieceIndex) bool { // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when // https://github.com/pion/datachannel/issues/59 is fixed. -const writeBufferHighWaterLen = 1 << 15 +const ( + writeBufferHighWaterLen = 1 << 15 + writeBufferLowWaterLen = writeBufferHighWaterLen / 2 +) // Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is // done asynchronously, so it may be that we're not able to honour backpressure from this method. @@ -481,9 +486,17 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool { return index < len(cn.metadataRequests) && cn.metadataRequests[index] } +var ( + interestedMsgLen = len(pp.Message{Type: pp.Interested}.MustMarshalBinary()) + requestMsgLen = len(pp.Message{Type: pp.Request}.MustMarshalBinary()) + // This is the maximum request count that could fit in the write buffer if it's at or below the + // low water mark when we run maybeUpdateActualRequestState. + maxLocalToRemoteRequests = (writeBufferHighWaterLen - writeBufferLowWaterLen - interestedMsgLen) / requestMsgLen +) + // The actual value to use as the maximum outbound requests. -func (cn *Peer) nominalMaxRequests() (ret maxRequests) { - return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 2048)) +func (cn *Peer) nominalMaxRequests() maxRequests { + return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))) } func (cn *Peer) totalExpectingTime() (ret time.Duration) { @@ -649,6 +662,7 @@ func (me *Peer) cancel(r RequestIndex) { panic("request already cancelled") } } + me.decPeakRequests() if me.isLowOnRequests() { me.updateRequests("Peer.cancel") } @@ -662,9 +676,15 @@ func (me *PeerConn) _cancel(r RequestIndex) bool { } func (cn *PeerConn) fillWriteBuffer() { - if !cn.maybeUpdateActualRequestState() { - return - } + if cn.messageWriter.writeBuffer.Len() > writeBufferLowWaterLen { + // Fully committing to our max requests requires sufficient space (see + // maxLocalToRemoteRequests). Flush what we have instead. We also prefer always to make + // requests than to do PEX or upload, so we short-circuit before handling those. Any update + // request reason will not be cleared, so we'll come right back here when there's space. We + // can't do this in maybeUpdateActualRequestState because it's a method on Peer and has no + // knowledge of write buffers. + } + cn.maybeUpdateActualRequestState() if cn.pex.IsEnabled() { if flow := cn.pex.Share(cn.write); !flow { return @@ -703,6 +723,9 @@ func (cn *Peer) updateRequests(reason string) { if cn.needRequestUpdate != "" { return } + if reason != peerUpdateRequestsTimerReason && !cn.isLowOnRequests() { + return + } cn.needRequestUpdate = reason cn.handleUpdateRequests() } @@ -1235,7 +1258,9 @@ func (c *PeerConn) mainReadLoop() (err error) { // Returns true if it was valid to reject the request. func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { - if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) { + if c.deleteRequest(r) { + c.decPeakRequests() + } else if !c.requestState.Cancelled.CheckedRemove(r) { return false } if c.isLowOnRequests() { @@ -1743,10 +1768,6 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { return pc, ok } -func (pc *PeerConn) isLowOnRequests() bool { - return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty() -} - func (p *Peer) uncancelledRequests() uint64 { return p.requestState.Requests.GetCardinality() } diff --git a/peerconn_test.go b/peerconn_test.go index 7ecb6933..2d5aa681 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -238,3 +238,11 @@ func TestHaveAllThenBitfield(t *testing.T) { {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0}, }) } + +func TestApplyRequestStateWriteBufferConstraints(t *testing.T) { + c := qt.New(t) + c.Check(interestedMsgLen, qt.Equals, 5) + c.Check(requestMsgLen, qt.Equals, 17) + c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue) + c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests) +} diff --git a/request-strategy/peer.go b/request-strategy/peer.go index 6a69535f..d5366d76 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -6,7 +6,8 @@ import ( type PeerRequestState struct { Interested bool - // Expecting + // Expecting. TODO: This should be ordered so webseed requesters initiate in the same order they + // were assigned. Requests roaring.Bitmap // Cancelled and waiting response Cancelled roaring.Bitmap diff --git a/requesting.go b/requesting.go index 3b7359da..f97f82b5 100644 --- a/requesting.go +++ b/requesting.go @@ -4,6 +4,7 @@ import ( "container/heap" "context" "encoding/gob" + "fmt" "reflect" "runtime/pprof" "time" @@ -201,31 +202,46 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { return } -func (p *Peer) maybeUpdateActualRequestState() bool { +func (p *Peer) maybeUpdateActualRequestState() { + if p.closed.IsSet() { + return + } if p.needRequestUpdate == "" { - return true + return + } + if p.needRequestUpdate == peerUpdateRequestsTimerReason { + since := time.Since(p.lastRequestUpdate) + if since < updateRequestsTimerDuration { + panic(since) + } } - var more bool pprof.Do( context.Background(), pprof.Labels("update request", p.needRequestUpdate), func(_ context.Context) { next := p.getDesiredRequestState() - more = p.applyRequestState(next) + p.applyRequestState(next) }, ) - return more } // Transmit/action the request state to the peer. -func (p *Peer) applyRequestState(next desiredRequestState) bool { +func (p *Peer) applyRequestState(next desiredRequestState) { current := &p.requestState if !p.setInterested(next.Interested) { - return false + panic("insufficient write buffer") } more := true requestHeap := &next.Requests t := p.t + originalRequestCount := current.Requests.GetCardinality() + // We're either here on a timer, or because we ran out of requests. Both are valid reasons to + // alter peakRequests. + if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason { + panic(fmt.Sprintf( + "expected zero existing requests (%v) for update reason %q", + originalRequestCount, p.needRequestUpdate)) + } heap.Init(requestHeap) for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() { req := heap.Pop(requestHeap).(RequestIndex) @@ -245,14 +261,24 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool { break } } - // TODO: This may need to change, we might want to update even if there were no requests due to - // filtering them for being recently requested already. - p.updateRequestsTimer.Stop() - if more { - p.needRequestUpdate = "" - if current.Interested { - p.updateRequestsTimer.Reset(3 * time.Second) - } + if !more { + // This might fail if we incorrectly determine that we can fit up to the maximum allowed + // requests into the available write buffer space. We don't want that to happen because it + // makes our peak requests dependent on how much was already in the buffer. + panic(fmt.Sprintf( + "couldn't fill apply entire request state [newRequests=%v]", + current.Requests.GetCardinality()-originalRequestCount)) } - return more + newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount) + // log.Printf( + // "requests %v->%v (peak %v->%v) reason %q (peer %v)", + // originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p) + p.peakRequests = newPeakRequests + p.needRequestUpdate = "" + p.lastRequestUpdate = time.Now() + p.updateRequestsTimer.Reset(updateRequestsTimerDuration) } + +// This could be set to 10s to match the unchoke/request update interval recommended by some +// specifications. I've set it shorter to trigger it more often for testing for now. +const updateRequestsTimerDuration = 3 * time.Second diff --git a/torrent.go b/torrent.go index c0c57564..2e6fb6ef 100644 --- a/torrent.go +++ b/torrent.go @@ -2241,6 +2241,7 @@ func (t *Torrent) DisallowDataUpload() { defer t.cl.unlock() t.dataUploadDisallowed = true for c := range t.conns { + // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead? c.updateRequests("disallow data upload") } } diff --git a/webseed-peer.go b/webseed-peer.go index f2ef7a81..2486aba0 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -187,10 +187,6 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re return err } -func (me *webseedPeer) isLowOnRequests() bool { - return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests) -} - func (me *webseedPeer) peerPieces() *roaring.Bitmap { return &me.client.Pieces } -- 2.44.0